1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::SocketAddr,
7 num::{NonZeroU32, NonZeroUsize},
8 sync::Arc,
9};
10
11use bytes::{Bytes, BytesMut};
12use frame::StreamMetaVec;
13
14use rand::{RngExt, SeedableRng, rngs::StdRng};
15use rustc_hash::{FxHashMap, FxHashSet};
16use thiserror::Error;
17use tracing::{debug, error, trace, trace_span, warn};
18
19use crate::{
20 Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
21 MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
22 TransportError, TransportErrorCode, VarInt,
23 cid_generator::ConnectionIdGenerator,
24 cid_queue::CidQueue,
25 config::{ServerConfig, TransportConfig},
26 congestion::Controller,
27 connection::{
28 qlog::{QlogRecvPacket, QlogSink},
29 spaces::LostPacket,
30 stats::PathStatsMap,
31 timer::{ConnTimer, PathTimer},
32 },
33 crypto::{self, Keys},
34 frame::{
35 self, Close, DataBlocked, Datagram, FrameStruct, NewToken, ObservedAddr, StreamDataBlocked,
36 StreamsBlocked,
37 },
38 n0_nat_traversal,
39 packet::{
40 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
41 PacketNumber, PartialDecode, SpaceId,
42 },
43 range_set::ArrayRangeSet,
44 shared::{
45 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
46 EndpointEvent, EndpointEventInner,
47 },
48 token::{ResetToken, Token, TokenPayload},
49 transport_parameters::TransportParameters,
50};
51
52mod ack_frequency;
53use ack_frequency::AckFrequencyState;
54
55mod assembler;
56pub use assembler::Chunk;
57
58mod cid_state;
59use cid_state::CidState;
60
61mod datagrams;
62use datagrams::DatagramState;
63pub use datagrams::{Datagrams, SendDatagramError};
64
65mod mtud;
66mod pacing;
67
68mod packet_builder;
69use packet_builder::{PacketBuilder, PadDatagram};
70
71mod packet_crypto;
72use packet_crypto::CryptoState;
73pub(crate) use packet_crypto::EncryptionLevel;
74
75mod paths;
76pub use paths::{
77 ClosedPath, PathAbandonReason, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError,
78};
79use paths::{PathData, PathState};
80
81pub(crate) mod qlog;
82pub(crate) mod send_buffer;
83
84pub(crate) mod spaces;
85#[cfg(fuzzing)]
86pub use spaces::Retransmits;
87#[cfg(not(fuzzing))]
88use spaces::Retransmits;
89pub(crate) use spaces::SpaceKind;
90use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
91
92mod stats;
93pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
94
95mod streams;
96#[cfg(fuzzing)]
97pub use streams::StreamsState;
98#[cfg(not(fuzzing))]
99use streams::StreamsState;
100pub use streams::{
101 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
102 ShouldTransmit, StreamEvent, Streams, WriteError,
103};
104
105mod timer;
106use timer::{Timer, TimerTable};
107
108mod transmit_buf;
109use transmit_buf::TransmitBuf;
110
111mod state;
112
113#[cfg(not(fuzzing))]
114use state::State;
115#[cfg(fuzzing)]
116pub use state::State;
117use state::StateType;
118
119pub struct Connection {
159 endpoint_config: Arc<EndpointConfig>,
160 config: Arc<TransportConfig>,
161 rng: StdRng,
162 crypto_state: CryptoState,
164 handshake_cid: ConnectionId,
166 remote_handshake_cid: ConnectionId,
168 paths: BTreeMap<PathId, PathState>,
174 path_generation_counter: u64,
185 allow_mtud: bool,
187 state: State,
188 side: ConnectionSide,
189 peer_params: TransportParameters,
191 original_remote_cid: ConnectionId,
193 initial_dst_cid: ConnectionId,
195 retry_src_cid: Option<ConnectionId>,
198 events: VecDeque<Event>,
200 endpoint_events: VecDeque<EndpointEventInner>,
201 spin_enabled: bool,
203 spin: bool,
205 spaces: [PacketSpace; 3],
207 highest_space: SpaceKind,
209 idle_timeout: Option<Duration>,
211 timers: TimerTable,
212 authentication_failures: u64,
214
215 connection_close_pending: bool,
220
221 ack_frequency: AckFrequencyState,
225
226 receiving_ecn: bool,
231 total_authed_packets: u64,
233
234 next_observed_addr_seq_no: VarInt,
239
240 streams: StreamsState,
241 remote_cids: FxHashMap<PathId, CidQueue>,
247 local_cid_state: FxHashMap<PathId, CidState>,
254 datagrams: DatagramState,
256 path_stats: PathStatsMap,
258 partial_stats: ConnectionStats,
264 version: u32,
266
267 max_concurrent_paths: NonZeroU32,
276 local_max_path_id: PathId,
291 remote_max_path_id: PathId,
297 max_path_id_with_cids: PathId,
303 abandoned_paths: FxHashSet<PathId>,
311
312 n0_nat_traversal: n0_nat_traversal::State,
314 qlog: QlogSink,
315}
316
317impl Connection {
318 pub(crate) fn new(
319 endpoint_config: Arc<EndpointConfig>,
320 config: Arc<TransportConfig>,
321 init_cid: ConnectionId,
322 local_cid: ConnectionId,
323 remote_cid: ConnectionId,
324 network_path: FourTuple,
325 crypto: Box<dyn crypto::Session>,
326 cid_gen: &dyn ConnectionIdGenerator,
327 now: Instant,
328 version: u32,
329 allow_mtud: bool,
330 rng_seed: [u8; 32],
331 side_args: SideArgs,
332 qlog: QlogSink,
333 ) -> Self {
334 let pref_addr_cid = side_args.pref_addr_cid();
335 let path_validated = side_args.path_validated();
336 let connection_side = ConnectionSide::from(side_args);
337 let side = connection_side.side();
338 let mut rng = StdRng::from_seed(rng_seed);
339 let initial_space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
340 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
341 #[cfg(test)]
342 let data_space = match config.deterministic_packet_numbers {
343 true => PacketSpace::new_deterministic(now, SpaceId::Data),
344 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
345 };
346 #[cfg(not(test))]
347 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
348 let state = State::handshake(state::Handshake {
349 remote_cid_set: side.is_server(),
350 expected_token: Bytes::new(),
351 client_hello: None,
352 allow_server_migration: side.is_client(),
353 });
354 let local_cid_state = FxHashMap::from_iter([(
355 PathId::ZERO,
356 CidState::new(
357 cid_gen.cid_len(),
358 cid_gen.cid_lifetime(),
359 now,
360 if pref_addr_cid.is_some() { 2 } else { 1 },
361 ),
362 )]);
363
364 let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
365 path.open_status = paths::OpenStatus::Informed;
366 let mut this = Self {
367 endpoint_config,
368 crypto_state: CryptoState::new(crypto, init_cid, side, &mut rng),
369 handshake_cid: local_cid,
370 remote_handshake_cid: remote_cid,
371 local_cid_state,
372 paths: BTreeMap::from_iter([(
373 PathId::ZERO,
374 PathState {
375 data: path,
376 prev: None,
377 },
378 )]),
379 path_generation_counter: 0,
380 allow_mtud,
381 state,
382 side: connection_side,
383 peer_params: TransportParameters::default(),
384 original_remote_cid: remote_cid,
385 initial_dst_cid: init_cid,
386 retry_src_cid: None,
387 events: VecDeque::new(),
388 endpoint_events: VecDeque::new(),
389 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
390 spin: false,
391 spaces: [initial_space, handshake_space, data_space],
392 highest_space: SpaceKind::Initial,
393 idle_timeout: match config.max_idle_timeout {
394 None | Some(VarInt(0)) => None,
395 Some(dur) => Some(Duration::from_millis(dur.0)),
396 },
397 timers: TimerTable::default(),
398 authentication_failures: 0,
399 connection_close_pending: false,
400
401 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
402 &TransportParameters::default(),
403 )),
404
405 receiving_ecn: false,
406 total_authed_packets: 0,
407
408 next_observed_addr_seq_no: 0u32.into(),
409
410 streams: StreamsState::new(
411 side,
412 config.max_concurrent_uni_streams,
413 config.max_concurrent_bidi_streams,
414 config.send_window,
415 config.receive_window,
416 config.stream_receive_window,
417 ),
418 datagrams: DatagramState::default(),
419 config,
420 remote_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(remote_cid))]),
421 rng,
422 path_stats: Default::default(),
423 partial_stats: ConnectionStats::default(),
424 version,
425
426 max_concurrent_paths: NonZeroU32::MIN,
428 local_max_path_id: PathId::ZERO,
429 remote_max_path_id: PathId::ZERO,
430 max_path_id_with_cids: PathId::ZERO,
431 abandoned_paths: Default::default(),
432
433 n0_nat_traversal: Default::default(),
434 qlog,
435 };
436 if path_validated {
437 this.on_path_validated(PathId::ZERO);
438 }
439 if side.is_client() {
440 this.write_crypto();
442 this.init_0rtt(now);
443 }
444 this.qlog
445 .emit_tuple_assigned(PathId::ZERO, network_path, now);
446 this
447 }
448
449 #[must_use]
457 pub fn poll_timeout(&mut self) -> Option<Instant> {
458 self.timers.peek()
459 }
460
461 #[must_use]
467 pub fn poll(&mut self) -> Option<Event> {
468 if let Some(x) = self.events.pop_front() {
469 return Some(x);
470 }
471
472 if let Some(event) = self.streams.poll() {
473 return Some(Event::Stream(event));
474 }
475
476 if let Some(reason) = self.state.take_error() {
477 return Some(Event::ConnectionLost { reason });
478 }
479
480 None
481 }
482
483 #[must_use]
485 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
486 self.endpoint_events.pop_front().map(EndpointEvent)
487 }
488
489 #[must_use]
491 pub fn streams(&mut self) -> Streams<'_> {
492 Streams {
493 state: &mut self.streams,
494 conn_state: &self.state,
495 }
496 }
497
498 #[must_use]
500 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
501 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
502 RecvStream {
503 id,
504 state: &mut self.streams,
505 pending: &mut self.spaces[SpaceId::Data].pending,
506 }
507 }
508
509 #[must_use]
511 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
512 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
513 SendStream {
514 id,
515 state: &mut self.streams,
516 pending: &mut self.spaces[SpaceId::Data].pending,
517 conn_state: &self.state,
518 }
519 }
520
521 pub fn open_path_ensure(
538 &mut self,
539 network_path: FourTuple,
540 initial_status: PathStatus,
541 now: Instant,
542 ) -> Result<(PathId, bool), PathError> {
543 let existing_open_path = self.paths.iter().find(|(id, path)| {
544 network_path.is_probably_same_path(&path.data.network_path)
545 && !self.abandoned_paths.contains(*id)
546 });
547 match existing_open_path {
548 Some((path_id, _state)) => Ok((*path_id, true)),
549 None => Ok((self.open_path(network_path, initial_status, now)?, false)),
550 }
551 }
552
553 pub fn open_path(
559 &mut self,
560 network_path: FourTuple,
561 initial_status: PathStatus,
562 now: Instant,
563 ) -> Result<PathId, PathError> {
564 if !self.is_multipath_negotiated() {
565 return Err(PathError::MultipathNotNegotiated);
566 }
567 if self.side().is_server() {
568 return Err(PathError::ServerSideNotAllowed);
569 }
570
571 let max_abandoned = self.abandoned_paths.iter().max().copied();
572 let max_used = self.paths.keys().last().copied();
573 let path_id = max_abandoned
574 .max(max_used)
575 .unwrap_or(PathId::ZERO)
576 .saturating_add(1u8);
577
578 if Some(path_id) > self.max_path_id() {
579 return Err(PathError::MaxPathIdReached);
580 }
581 if path_id > self.remote_max_path_id {
582 self.spaces[SpaceId::Data].pending.paths_blocked = true;
583 return Err(PathError::MaxPathIdReached);
584 }
585 if self
586 .remote_cids
587 .get(&path_id)
588 .map(CidQueue::active)
589 .is_none()
590 {
591 self.spaces[SpaceId::Data]
592 .pending
593 .path_cids_blocked
594 .insert(path_id);
595 return Err(PathError::RemoteCidsExhausted);
596 }
597
598 let path = self.ensure_path(path_id, network_path, now, None);
599 path.status.local_update(initial_status);
600
601 Ok(path_id)
602 }
603
604 pub fn close_path(
610 &mut self,
611 now: Instant,
612 path_id: PathId,
613 error_code: VarInt,
614 ) -> Result<(), ClosePathError> {
615 self.close_path_inner(
616 now,
617 path_id,
618 PathAbandonReason::ApplicationClosed { error_code },
619 )
620 }
621
622 pub(crate) fn close_path_inner(
627 &mut self,
628 now: Instant,
629 path_id: PathId,
630 reason: PathAbandonReason,
631 ) -> Result<(), ClosePathError> {
632 if self.state.is_drained() {
633 return Ok(());
634 }
635
636 if !self.is_multipath_negotiated() {
637 return Err(ClosePathError::MultipathNotNegotiated);
638 }
639 if self.abandoned_paths.contains(&path_id)
640 || Some(path_id) > self.max_path_id()
641 || !self.paths.contains_key(&path_id)
642 {
643 return Err(ClosePathError::ClosedPath);
644 }
645
646 let is_last_path = !self
647 .paths
648 .keys()
649 .any(|id| *id != path_id && !self.abandoned_paths.contains(id));
650
651 if is_last_path && !reason.is_remote() {
652 return Err(ClosePathError::LastOpenPath);
653 }
654
655 self.abandon_path(now, path_id, reason);
656
657 if is_last_path {
661 let rtt = RttEstimator::new(self.config.initial_rtt);
665 let pto = rtt.pto_base() + self.ack_frequency.max_ack_delay_for_pto();
666 let grace = pto * 3;
667 self.timers.set(
668 Timer::Conn(ConnTimer::NoAvailablePath),
669 now + grace,
670 self.qlog.with_time(now),
671 );
672 }
673
674 Ok(())
675 }
676
677 fn abandon_path(&mut self, now: Instant, path_id: PathId, reason: PathAbandonReason) {
682 trace!(%path_id, ?reason, "abandoning path");
683
684 let pending_space = &mut self.spaces[SpaceId::Data].pending;
685 pending_space
687 .path_abandon
688 .insert(path_id, reason.error_code());
689
690 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
692 pending_space.path_cids_blocked.retain(|&id| id != path_id);
693 pending_space.path_status.retain(|&id| id != path_id);
694
695 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
697 for sent_packet in space.sent_packets.values_mut() {
698 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
699 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
700 retransmits.path_cids_blocked.retain(|&id| id != path_id);
701 retransmits.path_status.retain(|&id| id != path_id);
702 }
703 }
704 }
705
706 self.spaces[SpaceId::Data].for_path(path_id).loss_probes = 0;
711
712 debug_assert!(!self.state.is_drained()); self.endpoint_events
717 .push_back(EndpointEventInner::RetireResetToken(path_id));
718
719 self.abandoned_paths.insert(path_id);
720
721 for timer in timer::PathTimer::VALUES {
722 let keep_timer = match timer {
724 PathTimer::PathValidationFailed
728 | PathTimer::PathChallengeLost
729 | PathTimer::AbandonFromValidation => false,
730 PathTimer::PathKeepAlive | PathTimer::PathIdle => false,
733 PathTimer::MaxAckDelay => false,
736 PathTimer::PathDrained => false,
739 PathTimer::LossDetection => true,
742 PathTimer::Pacing => true,
746 };
747
748 if !keep_timer {
749 let qlog = self.qlog.with_time(now);
750 self.timers.stop(Timer::PerPath(path_id, timer), qlog);
751 }
752 }
753
754 self.set_loss_detection_timer(now, path_id);
759
760 self.events.push_back(Event::Path(PathEvent::Abandoned {
762 id: path_id,
763 reason,
764 }));
765 }
766
767 #[track_caller]
771 fn path_data(&self, path_id: PathId) -> &PathData {
772 if let Some(data) = self.paths.get(&path_id) {
773 &data.data
774 } else {
775 panic!(
776 "unknown path: {path_id}, currently known paths: {:?}",
777 self.paths.keys().collect::<Vec<_>>()
778 );
779 }
780 }
781
782 #[track_caller]
786 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
787 &mut self.paths.get_mut(&path_id).expect("known path").data
788 }
789
790 fn path(&self, path_id: PathId) -> Option<&PathData> {
792 self.paths.get(&path_id).map(|path_state| &path_state.data)
793 }
794
795 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
797 self.paths
798 .get_mut(&path_id)
799 .map(|path_state| &mut path_state.data)
800 }
801
802 pub fn paths(&self) -> Vec<PathId> {
806 self.paths.keys().copied().collect()
807 }
808
809 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
811 self.path(path_id)
812 .map(PathData::local_status)
813 .ok_or(ClosedPath { _private: () })
814 }
815
816 pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
818 self.path(path_id)
819 .map(|path| path.network_path)
820 .ok_or(ClosedPath { _private: () })
821 }
822
823 pub fn set_path_status(
827 &mut self,
828 path_id: PathId,
829 status: PathStatus,
830 ) -> Result<PathStatus, SetPathStatusError> {
831 if !self.is_multipath_negotiated() {
832 return Err(SetPathStatusError::MultipathNotNegotiated);
833 }
834 let path = self
835 .path_mut(path_id)
836 .ok_or(SetPathStatusError::ClosedPath)?;
837 let prev = match path.status.local_update(status) {
838 Some(prev) => {
839 self.spaces[SpaceId::Data]
840 .pending
841 .path_status
842 .insert(path_id);
843 prev
844 }
845 None => path.local_status(),
846 };
847 Ok(prev)
848 }
849
850 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
855 self.path(path_id).and_then(|path| path.remote_status())
856 }
857
858 pub fn set_path_max_idle_timeout(
867 &mut self,
868 now: Instant,
869 path_id: PathId,
870 timeout: Option<Duration>,
871 ) -> Result<Option<Duration>, ClosedPath> {
872 let path = self
873 .paths
874 .get_mut(&path_id)
875 .ok_or(ClosedPath { _private: () })?;
876 let prev = std::mem::replace(&mut path.data.idle_timeout, timeout);
877
878 if !self.state.is_closed() {
880 if let Some(new_timeout) = timeout {
881 let timer = Timer::PerPath(path_id, PathTimer::PathIdle);
882 let deadline = match (prev, self.timers.get(timer)) {
883 (Some(old_timeout), Some(old_deadline)) => {
884 let last_activity = old_deadline.checked_sub(old_timeout).unwrap_or(now);
885 last_activity + new_timeout
886 }
887 _ => now + new_timeout,
888 };
889 self.timers.set(timer, deadline, self.qlog.with_time(now));
890 } else {
891 self.timers.stop(
892 Timer::PerPath(path_id, PathTimer::PathIdle),
893 self.qlog.with_time(now),
894 );
895 }
896 }
897
898 Ok(prev)
899 }
900
901 pub fn set_path_keep_alive_interval(
907 &mut self,
908 path_id: PathId,
909 interval: Option<Duration>,
910 ) -> Result<Option<Duration>, ClosedPath> {
911 let path = self
912 .paths
913 .get_mut(&path_id)
914 .ok_or(ClosedPath { _private: () })?;
915 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
916 }
917
918 fn find_validated_path_on_network_path(
922 &self,
923 network_path: FourTuple,
924 ) -> Option<(&PathId, &PathState)> {
925 self.paths.iter().find(|(path_id, path_state)| {
926 path_state.data.validated
927 && network_path.is_probably_same_path(&path_state.data.network_path)
929 && !self.abandoned_paths.contains(path_id)
930 })
931 }
935
936 fn ensure_path(
940 &mut self,
941 path_id: PathId,
942 network_path: FourTuple,
943 now: Instant,
944 pn: Option<u64>,
945 ) -> &mut PathData {
946 let valid_path = self.find_validated_path_on_network_path(network_path);
947 let validated = valid_path.is_some();
948 let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
949 let vacant_entry = match self.paths.entry(path_id) {
950 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
951 btree_map::Entry::Occupied(occupied_entry) => {
952 return &mut occupied_entry.into_mut().data;
953 }
954 };
955
956 debug!(%validated, %path_id, %network_path, "path added");
957
958 self.timers.stop(
960 Timer::Conn(ConnTimer::NoAvailablePath),
961 self.qlog.with_time(now),
962 );
963 let peer_max_udp_payload_size =
964 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
965 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
966 let mut data = PathData::new(
967 network_path,
968 self.allow_mtud,
969 Some(peer_max_udp_payload_size),
970 self.path_generation_counter,
971 now,
972 &self.config,
973 );
974
975 data.validated = validated;
976 if let Some(initial_rtt) = initial_rtt {
977 data.rtt.reset_initial_rtt(initial_rtt);
978 }
979
980 data.pending_on_path_challenge = true;
983
984 let path = vacant_entry.insert(PathState { data, prev: None });
985
986 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
987 if let Some(pn) = pn {
988 pn_space.dedup.insert(pn);
989 }
990 self.spaces[SpaceId::Data]
991 .number_spaces
992 .insert(path_id, pn_space);
993 self.qlog.emit_tuple_assigned(path_id, network_path, now);
994
995 if !self.remote_cids.contains_key(&path_id) {
999 debug!(%path_id, "Remote opened path without issuing CIDs");
1000 self.spaces[SpaceId::Data]
1001 .pending
1002 .path_cids_blocked
1003 .insert(path_id);
1004 }
1007
1008 &mut path.data
1009 }
1010
1011 #[must_use]
1021 pub fn poll_transmit(
1022 &mut self,
1023 now: Instant,
1024 max_datagrams: NonZeroUsize,
1025 buf: &mut Vec<u8>,
1026 ) -> Option<Transmit> {
1027 let max_datagrams = match self.config.enable_segmentation_offload {
1028 false => NonZeroUsize::MIN,
1029 true => max_datagrams,
1030 };
1031
1032 let connection_close_pending = match self.state.as_type() {
1038 StateType::Drained => {
1039 for path in self.paths.values_mut() {
1040 path.data.app_limited = true;
1041 }
1042 return None;
1043 }
1044 StateType::Draining | StateType::Closed => {
1045 if !self.connection_close_pending {
1048 for path in self.paths.values_mut() {
1049 path.data.app_limited = true;
1050 }
1051 return None;
1052 }
1053 true
1054 }
1055 _ => false,
1056 };
1057
1058 if let Some(config) = &self.config.ack_frequency_config {
1060 let rtt = self
1061 .paths
1062 .values()
1063 .map(|p| p.data.rtt.get())
1064 .min()
1065 .expect("one path exists");
1066 self.spaces[SpaceId::Data].pending.ack_frequency = self
1067 .ack_frequency
1068 .should_send_ack_frequency(rtt, config, &self.peer_params)
1069 && self.highest_space == SpaceKind::Data
1070 && self.peer_supports_ack_frequency();
1071 }
1072
1073 let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1074 while let Some(path_id) = next_path_id {
1075 if !connection_close_pending
1076 && let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id)
1077 {
1078 return Some(transmit);
1079 }
1080
1081 let info = self.scheduling_info(path_id);
1082 if let Some(transmit) = self.poll_transmit_on_path(
1083 now,
1084 buf,
1085 path_id,
1086 max_datagrams,
1087 &info,
1088 connection_close_pending,
1089 ) {
1090 return Some(transmit);
1091 }
1092
1093 debug_assert!(
1096 buf.is_empty(),
1097 "nothing to send on path but buffer not empty"
1098 );
1099
1100 next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1101 }
1102
1103 debug_assert!(
1105 buf.is_empty(),
1106 "there was data in the buffer, but it was not sent"
1107 );
1108
1109 if self.state.is_established() {
1110 let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1112 while let Some(path_id) = next_path_id {
1113 if let Some(transmit) = self.poll_transmit_mtu_probe(now, buf, path_id) {
1114 return Some(transmit);
1115 }
1116 next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1117 }
1118 }
1119
1120 None
1121 }
1122
1123 fn scheduling_info(&self, path_id: PathId) -> PathSchedulingInfo {
1141 let have_validated_status_available_space = self.paths.iter().any(|(path_id, path)| {
1143 self.remote_cids.contains_key(path_id)
1144 && !self.abandoned_paths.contains(path_id)
1145 && path.data.validated
1146 && path.data.local_status() == PathStatus::Available
1147 });
1148
1149 let have_validated_space = self.paths.iter().any(|(path_id, path)| {
1151 self.remote_cids.contains_key(path_id)
1152 && !self.abandoned_paths.contains(path_id)
1153 && path.data.validated
1154 });
1155
1156 let is_handshaking = self.is_handshaking();
1157 let has_cids = self.remote_cids.contains_key(&path_id);
1158 let is_abandoned = self.abandoned_paths.contains(&path_id);
1159 let path_data = self.path_data(path_id);
1160 let validated = path_data.validated;
1161 let status = path_data.local_status();
1162
1163 let may_send_data = has_cids
1166 && !is_abandoned
1167 && if is_handshaking {
1168 true
1172 } else if !validated {
1173 false
1180 } else {
1181 match status {
1182 PathStatus::Available => {
1183 true
1185 }
1186 PathStatus::Backup => {
1187 !have_validated_status_available_space
1189 }
1190 }
1191 };
1192
1193 let may_send_close = has_cids
1198 && !is_abandoned
1199 && if !validated && have_validated_status_available_space {
1200 false
1202 } else {
1203 true
1205 };
1206
1207 let may_self_abandon = has_cids && validated && !have_validated_space;
1211
1212 PathSchedulingInfo {
1213 is_abandoned,
1214 may_send_data,
1215 may_send_close,
1216 may_self_abandon,
1217 }
1218 }
1219
1220 fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit {
1221 debug_assert!(
1222 !transmit.is_empty(),
1223 "must not be called with an empty transmit buffer"
1224 );
1225
1226 let network_path = self.path_data(path_id).network_path;
1227 trace!(
1228 segment_size = transmit.segment_size(),
1229 last_datagram_len = transmit.len() % transmit.segment_size(),
1230 %network_path,
1231 "sending {} bytes in {} datagrams",
1232 transmit.len(),
1233 transmit.num_datagrams()
1234 );
1235 self.path_data_mut(path_id)
1236 .inc_total_sent(transmit.len() as u64);
1237
1238 self.path_stats
1239 .for_path(path_id)
1240 .udp_tx
1241 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1242
1243 Transmit {
1244 destination: network_path.remote,
1245 size: transmit.len(),
1246 ecn: if self.path_data(path_id).sending_ecn {
1247 Some(EcnCodepoint::Ect0)
1248 } else {
1249 None
1250 },
1251 segment_size: match transmit.num_datagrams() {
1252 1 => None,
1253 _ => Some(transmit.segment_size()),
1254 },
1255 src_ip: network_path.local_ip,
1256 }
1257 }
1258
1259 fn poll_transmit_off_path(
1261 &mut self,
1262 now: Instant,
1263 buf: &mut Vec<u8>,
1264 path_id: PathId,
1265 ) -> Option<Transmit> {
1266 if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) {
1267 return Some(challenge);
1268 }
1269 if let Some(response) = self.send_off_path_path_response(now, buf, path_id) {
1270 return Some(response);
1271 }
1272 if let Some(challenge) = self.send_nat_traversal_path_challenge(now, buf, path_id) {
1273 return Some(challenge);
1274 }
1275 None
1276 }
1277
1278 #[must_use]
1285 fn poll_transmit_on_path(
1286 &mut self,
1287 now: Instant,
1288 buf: &mut Vec<u8>,
1289 path_id: PathId,
1290 max_datagrams: NonZeroUsize,
1291 scheduling_info: &PathSchedulingInfo,
1292 connection_close_pending: bool,
1293 ) -> Option<Transmit> {
1294 let Some(remote_cid) = self.remote_cids.get(&path_id).map(CidQueue::active) else {
1296 if !self.abandoned_paths.contains(&path_id) {
1297 debug!(%path_id, "no remote CIDs for path");
1298 }
1299 return None;
1300 };
1301
1302 let mut pad_datagram = PadDatagram::No;
1308
1309 let mut last_packet_number = None;
1313
1314 let mut congestion_blocked = false;
1317
1318 let pmtu = self.path_data(path_id).current_mtu().into();
1320 let mut transmit = TransmitBuf::new(buf, max_datagrams, pmtu);
1321
1322 for space_id in SpaceId::iter() {
1324 if path_id != PathId::ZERO && space_id != SpaceId::Data {
1326 continue;
1327 }
1328 match self.poll_transmit_path_space(
1329 now,
1330 &mut transmit,
1331 path_id,
1332 space_id,
1333 remote_cid,
1334 scheduling_info,
1335 connection_close_pending,
1336 pad_datagram,
1337 ) {
1338 PollPathSpaceStatus::NothingToSend {
1339 congestion_blocked: cb,
1340 } => {
1341 congestion_blocked |= cb;
1342 }
1345 PollPathSpaceStatus::WrotePacket {
1346 last_packet_number: pn,
1347 pad_datagram: pad,
1348 } => {
1349 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1350 last_packet_number = Some(pn);
1351 pad_datagram = pad;
1352 continue;
1357 }
1358 PollPathSpaceStatus::Send {
1359 last_packet_number: pn,
1360 } => {
1361 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1362 last_packet_number = Some(pn);
1363 break;
1364 }
1365 }
1366 }
1367
1368 if last_packet_number.is_some() || congestion_blocked {
1369 self.qlog.emit_recovery_metrics(
1370 path_id,
1371 &mut self
1372 .paths
1373 .get_mut(&path_id)
1374 .expect("path_id was iterated from self.paths above")
1375 .data,
1376 now,
1377 );
1378 }
1379
1380 self.path_data_mut(path_id).app_limited =
1381 last_packet_number.is_none() && !congestion_blocked;
1382
1383 match last_packet_number {
1384 Some(last_packet_number) => {
1385 self.path_data_mut(path_id).congestion.on_sent(
1388 now,
1389 transmit.len() as u64,
1390 last_packet_number,
1391 );
1392 Some(self.build_transmit(path_id, transmit))
1393 }
1394 None => None,
1395 }
1396 }
1397
1398 #[must_use]
1400 fn poll_transmit_path_space(
1401 &mut self,
1402 now: Instant,
1403 transmit: &mut TransmitBuf<'_>,
1404 path_id: PathId,
1405 space_id: SpaceId,
1406 remote_cid: ConnectionId,
1407 scheduling_info: &PathSchedulingInfo,
1408 connection_close_pending: bool,
1410 mut pad_datagram: PadDatagram,
1412 ) -> PollPathSpaceStatus {
1413 let mut last_packet_number = None;
1416
1417 loop {
1433 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1435 transmit.datagram_remaining_mut()
1437 } else {
1438 transmit.segment_size()
1440 };
1441 let can_send =
1442 self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending);
1443 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1444 let space_will_send = {
1445 if scheduling_info.is_abandoned {
1446 scheduling_info.may_self_abandon
1451 && self.spaces[space_id]
1452 .pending
1453 .path_abandon
1454 .contains_key(&path_id)
1455 } else if can_send.close && scheduling_info.may_send_close {
1456 true
1458 } else if needs_loss_probe || can_send.space_specific {
1459 true
1462 } else {
1463 !can_send.is_empty() && scheduling_info.may_send_data
1466 }
1467 };
1468
1469 if !space_will_send {
1470 return match last_packet_number {
1473 Some(pn) => PollPathSpaceStatus::WrotePacket {
1474 last_packet_number: pn,
1475 pad_datagram,
1476 },
1477 None => {
1478 if self.crypto_state.has_keys(space_id.encryption_level())
1480 || (space_id == SpaceId::Data
1481 && self.crypto_state.has_keys(EncryptionLevel::ZeroRtt))
1482 {
1483 trace!(?space_id, %path_id, "nothing to send in space");
1484 }
1485 PollPathSpaceStatus::NothingToSend {
1486 congestion_blocked: false,
1487 }
1488 }
1489 };
1490 }
1491
1492 if transmit.datagram_remaining_mut() == 0 {
1496 let congestion_blocked =
1497 self.path_congestion_check(space_id, path_id, transmit, &can_send, now);
1498 if congestion_blocked != PathBlocked::No {
1499 return match last_packet_number {
1501 Some(pn) => PollPathSpaceStatus::WrotePacket {
1502 last_packet_number: pn,
1503 pad_datagram,
1504 },
1505 None => {
1506 return PollPathSpaceStatus::NothingToSend {
1507 congestion_blocked: true,
1508 };
1509 }
1510 };
1511 }
1512
1513 if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1516 return match last_packet_number {
1519 Some(pn) => PollPathSpaceStatus::WrotePacket {
1520 last_packet_number: pn,
1521 pad_datagram,
1522 },
1523 None => {
1524 return PollPathSpaceStatus::NothingToSend {
1525 congestion_blocked: false,
1526 };
1527 }
1528 };
1529 }
1530
1531 if needs_loss_probe {
1532 let request_immediate_ack =
1534 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1535 self.spaces[space_id].queue_tail_loss_probe(
1536 path_id,
1537 request_immediate_ack,
1538 &self.streams,
1539 );
1540
1541 self.spaces[space_id].for_path(path_id).loss_probes -= 1; transmit.start_new_datagram_with_size(std::cmp::min(
1547 usize::from(INITIAL_MTU),
1548 transmit.segment_size(),
1549 ));
1550 } else {
1551 transmit.start_new_datagram();
1552 }
1553 trace!(count = transmit.num_datagrams(), "new datagram started");
1554
1555 pad_datagram = PadDatagram::No;
1557 }
1558
1559 if transmit.datagram_start_offset() < transmit.len() {
1562 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1563 }
1564
1565 if self.crypto_state.has_keys(EncryptionLevel::Initial)
1570 && space_id == SpaceId::Handshake
1571 && self.side.is_client()
1572 {
1573 self.discard_space(now, SpaceKind::Initial);
1576 }
1577 if let Some(ref mut prev) = self.crypto_state.prev_crypto {
1578 prev.update_unacked = false;
1579 }
1580
1581 let Some(mut builder) =
1582 PacketBuilder::new(now, space_id, path_id, remote_cid, transmit, self)
1583 else {
1584 return PollPathSpaceStatus::NothingToSend {
1591 congestion_blocked: false,
1592 };
1593 };
1594 last_packet_number = Some(builder.packet_number);
1595
1596 if space_id == SpaceId::Initial
1597 && (self.side.is_client() || can_send.is_ack_eliciting())
1598 {
1599 pad_datagram |= PadDatagram::ToMinMtu;
1601 }
1602 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1603 pad_datagram |= PadDatagram::ToSegmentSize;
1604 }
1605
1606 if scheduling_info.may_send_close && can_send.close {
1607 trace!("sending CONNECTION_CLOSE");
1608 let is_multipath_negotiated = self.is_multipath_negotiated();
1613 for path_id in self.spaces[space_id]
1614 .number_spaces
1615 .iter()
1616 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1617 .map(|(&path_id, _)| path_id)
1618 .collect::<Vec<_>>()
1619 {
1620 Self::populate_acks(
1621 now,
1622 self.receiving_ecn,
1623 path_id,
1624 space_id,
1625 &mut self.spaces[space_id],
1626 is_multipath_negotiated,
1627 &mut builder,
1628 &mut self.path_stats.for_path(path_id).frame_tx,
1629 self.crypto_state.has_keys(space_id.encryption_level()),
1630 );
1631 }
1632
1633 debug_assert!(
1641 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1642 "ACKs should leave space for ConnectionClose"
1643 );
1644 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
1645 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1646 let max_frame_size = builder.frame_space_remaining();
1647 let close: Close = match self.state.as_type() {
1648 StateType::Closed => {
1649 let reason: Close =
1650 self.state.as_closed().expect("checked").clone().into();
1651 if space_id == SpaceId::Data || reason.is_transport_layer() {
1652 reason
1653 } else {
1654 TransportError::APPLICATION_ERROR("").into()
1655 }
1656 }
1657 StateType::Draining => TransportError::NO_ERROR("").into(),
1658 _ => unreachable!(
1659 "tried to make a close packet when the connection wasn't closed"
1660 ),
1661 };
1662 builder.write_frame(close.encoder(max_frame_size), stats);
1663 }
1664 let last_pn = builder.packet_number;
1665 builder.finish_and_track(now, self, path_id, pad_datagram);
1666 if space_id.kind() == self.highest_space {
1667 self.connection_close_pending = false;
1670 }
1671 return PollPathSpaceStatus::WrotePacket {
1684 last_packet_number: last_pn,
1685 pad_datagram,
1686 };
1687 }
1688
1689 self.populate_packet(now, space_id, path_id, scheduling_info, &mut builder);
1690
1691 debug_assert!(
1698 !(builder.sent_frames().is_ack_only(&self.streams)
1699 && !can_send.acks
1700 && (can_send.other || can_send.space_specific)
1701 && builder.buf.segment_size()
1702 == self.path_data(path_id).current_mtu() as usize
1703 && self.datagrams.outgoing.is_empty()),
1704 "SendableFrames was {can_send:?}, but only ACKs have been written"
1705 );
1706 if builder.sent_frames().requires_padding {
1707 pad_datagram |= PadDatagram::ToMinMtu;
1708 }
1709
1710 for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1711 self.spaces[space_id]
1712 .for_path(*path_id)
1713 .pending_acks
1714 .acks_sent();
1715 self.timers.stop(
1716 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1717 self.qlog.with_time(now),
1718 );
1719 }
1720
1721 if builder.can_coalesce && path_id == PathId::ZERO && {
1729 let max_packet_size = builder
1730 .buf
1731 .datagram_remaining_mut()
1732 .saturating_sub(builder.predict_packet_end());
1733 max_packet_size > MIN_PACKET_SPACE
1734 && self.has_pending_packet(space_id, max_packet_size, connection_close_pending)
1735 } {
1736 trace!("will coalesce with next packet");
1739 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1740 } else {
1741 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1747 const MAX_PADDING: usize = 32;
1755 if builder.buf.datagram_remaining_mut()
1756 > builder.predict_packet_end() + MAX_PADDING
1757 {
1758 trace!(
1759 "GSO truncated by demand for {} padding bytes",
1760 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1761 );
1762 let last_pn = builder.packet_number;
1763 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1764 return PollPathSpaceStatus::Send {
1765 last_packet_number: last_pn,
1766 };
1767 }
1768
1769 builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1772 } else {
1773 builder.finish_and_track(now, self, path_id, pad_datagram);
1774 }
1775
1776 if transmit.num_datagrams() == 1 {
1779 transmit.clip_segment_size();
1780 }
1781 }
1782 }
1783 }
1784
1785 fn poll_transmit_mtu_probe(
1786 &mut self,
1787 now: Instant,
1788 buf: &mut Vec<u8>,
1789 path_id: PathId,
1790 ) -> Option<Transmit> {
1791 let (active_cid, probe_size) = self.get_mtu_probe_data(now, path_id)?;
1792
1793 let mut transmit = TransmitBuf::new(buf, NonZeroUsize::MIN, probe_size as usize);
1795 transmit.start_new_datagram_with_size(probe_size as usize);
1796
1797 let mut builder =
1798 PacketBuilder::new(now, SpaceId::Data, path_id, active_cid, &mut transmit, self)?;
1799
1800 trace!(?probe_size, "writing MTUD probe");
1802 builder.write_frame(frame::Ping, &mut self.path_stats.for_path(path_id).frame_tx);
1803
1804 if self.peer_supports_ack_frequency() {
1806 builder.write_frame(
1807 frame::ImmediateAck,
1808 &mut self.path_stats.for_path(path_id).frame_tx,
1809 );
1810 }
1811
1812 builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1813
1814 self.path_stats.for_path(path_id).sent_plpmtud_probes += 1;
1815
1816 Some(self.build_transmit(path_id, transmit))
1817 }
1818
1819 fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> {
1827 let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?;
1828 let is_eligible = self.path_data(path_id).validated
1829 && !self.path_data(path_id).is_validating_path()
1830 && !self.abandoned_paths.contains(&path_id);
1831
1832 if !is_eligible {
1833 return None;
1834 }
1835 let next_pn = self.spaces[SpaceId::Data]
1836 .for_path(path_id)
1837 .peek_tx_number();
1838 let probe_size = self
1839 .path_data_mut(path_id)
1840 .mtud
1841 .poll_transmit(now, next_pn)?;
1842
1843 Some((active_cid, probe_size))
1844 }
1845
1846 fn has_pending_packet(
1863 &mut self,
1864 current_space_id: SpaceId,
1865 max_packet_size: usize,
1866 connection_close_pending: bool,
1867 ) -> bool {
1868 let mut space_id = current_space_id;
1869 loop {
1870 let can_send = self.space_can_send(
1871 space_id,
1872 PathId::ZERO,
1873 max_packet_size,
1874 connection_close_pending,
1875 );
1876 if !can_send.is_empty() {
1877 return true;
1878 }
1879 match space_id.next() {
1880 Some(next_space_id) => space_id = next_space_id,
1881 None => break,
1882 }
1883 }
1884 false
1885 }
1886
1887 fn path_congestion_check(
1889 &mut self,
1890 space_id: SpaceId,
1891 path_id: PathId,
1892 transmit: &TransmitBuf<'_>,
1893 can_send: &SendableFrames,
1894 now: Instant,
1895 ) -> PathBlocked {
1896 if self.side().is_server()
1902 && self
1903 .path_data(path_id)
1904 .anti_amplification_blocked(transmit.len() as u64 + 1)
1905 {
1906 trace!(?space_id, %path_id, "blocked by anti-amplification");
1907 return PathBlocked::AntiAmplification;
1908 }
1909
1910 let bytes_to_send = transmit.segment_size() as u64;
1913 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1914
1915 if can_send.other && !need_loss_probe && !can_send.close {
1916 let path = self.path_data(path_id);
1917 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1918 trace!(
1919 ?space_id,
1920 %path_id,
1921 in_flight=%path.in_flight.bytes,
1922 congestion_window=%path.congestion.window(),
1923 "blocked by congestion control",
1924 );
1925 return PathBlocked::Congestion;
1926 }
1927 }
1928
1929 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1931 let resume_time = now + delay;
1932 self.timers.set(
1933 Timer::PerPath(path_id, PathTimer::Pacing),
1934 resume_time,
1935 self.qlog.with_time(now),
1936 );
1937 trace!(?space_id, %path_id, ?delay, "blocked by pacing");
1940 return PathBlocked::Pacing;
1941 }
1942
1943 PathBlocked::No
1944 }
1945
1946 fn send_prev_path_challenge(
1951 &mut self,
1952 now: Instant,
1953 buf: &mut Vec<u8>,
1954 path_id: PathId,
1955 ) -> Option<Transmit> {
1956 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1957 if !prev_path.pending_on_path_challenge {
1958 return None;
1959 };
1960 prev_path.pending_on_path_challenge = false;
1961 let token = self.rng.random();
1962 let network_path = prev_path.network_path;
1963 prev_path.record_path_challenge_sent(now, token, network_path);
1964
1965 debug_assert_eq!(
1966 self.highest_space,
1967 SpaceKind::Data,
1968 "PATH_CHALLENGE queued without 1-RTT keys"
1969 );
1970 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1971 buf.start_new_datagram();
1972
1973 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, self)?;
1979 let challenge = frame::PathChallenge(token);
1980 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
1981 builder.write_frame_with_log_msg(challenge, stats, Some("validating previous path"));
1982
1983 builder.pad_to(MIN_INITIAL_SIZE);
1988
1989 builder.finish(self, now);
1990 self.path_stats
1991 .for_path(path_id)
1992 .udp_tx
1993 .on_sent(1, buf.len());
1994
1995 trace!(
1996 dst = ?network_path.remote,
1997 src = ?network_path.local_ip,
1998 len = buf.len(),
1999 "sending prev_path off-path challenge",
2000 );
2001 Some(Transmit {
2002 destination: network_path.remote,
2003 size: buf.len(),
2004 ecn: None,
2005 segment_size: None,
2006 src_ip: network_path.local_ip,
2007 })
2008 }
2009
2010 fn send_off_path_path_response(
2011 &mut self,
2012 now: Instant,
2013 buf: &mut Vec<u8>,
2014 path_id: PathId,
2015 ) -> Option<Transmit> {
2016 let path = self.paths.get_mut(&path_id).map(|state| &mut state.data)?;
2017 let cid_queue = self.remote_cids.get_mut(&path_id)?;
2018 let (token, network_path) = path.path_responses.pop_off_path(path.network_path)?;
2019
2020 let cid = cid_queue.active();
2022
2023 let frame = frame::PathResponse(token);
2024
2025 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2026 buf.start_new_datagram();
2027
2028 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, buf, self)?;
2029 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
2030 builder.write_frame_with_log_msg(frame, stats, Some("(off-path)"));
2031
2032 if self
2037 .find_validated_path_on_network_path(network_path)
2038 .is_none()
2039 && self.n0_nat_traversal.client_side().is_ok()
2040 {
2041 let token = self.rng.random();
2042 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
2043 builder.write_frame(frame::PathChallenge(token), stats);
2044 let ip_port = (network_path.remote.ip(), network_path.remote.port());
2045 self.n0_nat_traversal.mark_probe_sent(ip_port, token);
2046 }
2047
2048 builder.pad_to(MIN_INITIAL_SIZE);
2051 builder.finish(self, now);
2052
2053 let size = buf.len();
2054 self.path_stats.for_path(path_id).udp_tx.on_sent(1, size);
2055
2056 trace!(
2057 dst = ?network_path.remote,
2058 src = ?network_path.local_ip,
2059 len = buf.len(),
2060 "sending off-path PATH_RESPONSE",
2061 );
2062 Some(Transmit {
2063 destination: network_path.remote,
2064 size,
2065 ecn: None,
2066 segment_size: None,
2067 src_ip: network_path.local_ip,
2068 })
2069 }
2070
2071 fn send_nat_traversal_path_challenge(
2073 &mut self,
2074 now: Instant,
2075 buf: &mut Vec<u8>,
2076 path_id: PathId,
2077 ) -> Option<Transmit> {
2078 let remote = self.n0_nat_traversal.next_probe_addr()?;
2079
2080 if !self.paths.get(&path_id)?.data.validated {
2081 return None;
2083 }
2084
2085 let Some(cid) = self
2090 .remote_cids
2091 .get(&path_id)
2092 .map(|cid_queue| cid_queue.active())
2093 else {
2094 trace!(%path_id, "Not sending NAT traversal probe for path with no CIDs");
2095 return None;
2096 };
2097 let token = self.rng.random();
2098
2099 let frame = frame::PathChallenge(token);
2100
2101 let mut buf = TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2102 buf.start_new_datagram();
2103
2104 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, &mut buf, self)?;
2105 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
2106 builder.write_frame_with_log_msg(frame, stats, Some("(nat-traversal)"));
2107 builder.finish(self, now);
2110
2111 self.n0_nat_traversal.mark_probe_sent(remote, token);
2113
2114 let size = buf.len();
2115 self.path_stats.for_path(path_id).udp_tx.on_sent(1, size);
2116
2117 trace!(dst = ?remote, len = buf.len(), "sending off-path NAT probe");
2118 Some(Transmit {
2119 destination: remote.into(),
2120 size,
2121 ecn: None,
2122 segment_size: None,
2123 src_ip: None,
2124 })
2125 }
2126
2127 fn space_can_send(
2135 &mut self,
2136 space_id: SpaceId,
2137 path_id: PathId,
2138 packet_size: usize,
2139 connection_close_pending: bool,
2140 ) -> SendableFrames {
2141 let space = &mut self.spaces[space_id];
2142 let space_has_crypto = self.crypto_state.has_keys(space_id.encryption_level());
2143
2144 if !space_has_crypto
2145 && (space_id != SpaceId::Data
2146 || !self.crypto_state.has_keys(EncryptionLevel::ZeroRtt)
2147 || self.side.is_server())
2148 {
2149 return SendableFrames::empty();
2151 }
2152
2153 let mut can_send = space.can_send(path_id, &self.streams);
2154
2155 if space_id == SpaceId::Data {
2157 let pn = space.for_path(path_id).peek_tx_number();
2158 let frame_space_1rtt =
2164 packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
2165 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
2166 }
2167
2168 can_send.close = connection_close_pending && space_has_crypto;
2169
2170 can_send
2171 }
2172
2173 pub fn handle_event(&mut self, event: ConnectionEvent) {
2179 use ConnectionEventInner::*;
2180 match event.0 {
2181 Datagram(DatagramConnectionEvent {
2182 now,
2183 network_path,
2184 path_id,
2185 ecn,
2186 first_decode,
2187 remaining,
2188 }) => {
2189 let span = trace_span!("pkt", %path_id);
2190 let _guard = span.enter();
2191
2192 if self.early_discard_packet(network_path, path_id) {
2193 return;
2195 }
2196
2197 let was_anti_amplification_blocked = self
2198 .path(path_id)
2199 .map(|path| path.anti_amplification_blocked(1))
2200 .unwrap_or(false);
2203
2204 let rx = &mut self.path_stats.for_path(path_id).udp_rx;
2205 rx.datagrams += 1;
2206 rx.bytes += first_decode.len() as u64;
2207 let data_len = first_decode.len();
2208
2209 self.handle_decode(now, network_path, path_id, ecn, first_decode);
2210 if let Some(path) = self.path_mut(path_id) {
2215 path.inc_total_recvd(data_len as u64);
2216 }
2217
2218 if let Some(data) = remaining {
2219 self.path_stats.for_path(path_id).udp_rx.bytes += data.len() as u64;
2220 self.handle_coalesced(now, network_path, path_id, ecn, data);
2221 }
2222
2223 if let Some(path) = self.paths.get_mut(&path_id) {
2224 self.qlog
2225 .emit_recovery_metrics(path_id, &mut path.data, now);
2226 }
2227
2228 if was_anti_amplification_blocked {
2229 self.set_loss_detection_timer(now, path_id);
2233 }
2234 }
2235 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
2236 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
2237 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
2238
2239 if self.abandoned_paths.contains(&path_id) {
2242 if !self.state.is_drained() {
2243 for issued in &ids {
2244 self.endpoint_events
2245 .push_back(EndpointEventInner::RetireConnectionId(
2246 now,
2247 path_id,
2248 issued.sequence,
2249 false,
2250 ));
2251 }
2252 }
2253 return;
2254 }
2255
2256 let cid_state = self
2257 .local_cid_state
2258 .entry(path_id)
2259 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
2260 cid_state.new_cids(&ids, now);
2261
2262 ids.into_iter().rev().for_each(|frame| {
2263 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
2264 });
2265 self.reset_cid_retirement(now);
2267 }
2268 }
2269 }
2270
2271 fn early_discard_packet(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
2279 if self.is_handshaking() && path_id != PathId::ZERO {
2280 debug!(%network_path, %path_id, "discarding multipath packet during handshake");
2281 return true;
2282 }
2283
2284 let peer_may_probe = self.peer_may_probe();
2285 let local_ip_may_migrate = self.local_ip_may_migrate();
2286
2287 if let Some(known_path) = self.path_mut(path_id) {
2291 if network_path.remote != known_path.network_path.remote && !peer_may_probe {
2292 trace!(
2293 %path_id,
2294 %network_path,
2295 %known_path.network_path,
2296 "discarding packet from unrecognized peer"
2297 );
2298 return true;
2299 }
2300
2301 if known_path.network_path.local_ip.is_some()
2302 && network_path.local_ip.is_some()
2303 && known_path.network_path.local_ip != network_path.local_ip
2304 && !local_ip_may_migrate
2305 {
2306 trace!(
2307 %path_id,
2308 %network_path,
2309 %known_path.network_path,
2310 "discarding packet sent to incorrect interface"
2311 );
2312 return true;
2313 }
2314 }
2315 false
2316 }
2317
2318 fn peer_may_probe(&self) -> bool {
2329 match &self.side {
2330 ConnectionSide::Client { .. } => {
2331 if let Some(hs) = self.state.as_handshake() {
2332 hs.allow_server_migration
2333 } else {
2334 self.n0_nat_traversal.is_negotiated() && self.is_handshake_confirmed()
2335 }
2336 }
2337 ConnectionSide::Server { server_config } => {
2338 self.is_handshake_confirmed()
2339 && (server_config.migration || self.n0_nat_traversal.is_negotiated())
2340 }
2341 }
2342 }
2343
2344 fn peer_may_migrate(&self) -> bool {
2356 match &self.side {
2357 ConnectionSide::Server { server_config } => {
2358 server_config.migration && self.is_handshake_confirmed()
2359 }
2360 ConnectionSide::Client { .. } => false,
2361 }
2362 }
2363
2364 fn local_ip_may_migrate(&self) -> bool {
2377 (self.side.is_client() || self.n0_nat_traversal.is_negotiated())
2378 && self.is_handshake_confirmed()
2379 }
2380 pub fn handle_timeout(&mut self, now: Instant) {
2390 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
2391 let span = match timer {
2392 Timer::Conn(timer) => trace_span!("timeout", scope = "conn", ?timer),
2393 Timer::PerPath(path_id, timer) => {
2394 trace_span!("timer_fired", scope="path", %path_id, ?timer)
2395 }
2396 };
2397 let _guard = span.enter();
2398 trace!("timeout");
2399 match timer {
2400 Timer::Conn(timer) => match timer {
2401 ConnTimer::Close => {
2402 let was_draining = self.state.move_to_drained(None);
2403 if !was_draining {
2404 self.endpoint_events.push_back(EndpointEventInner::Draining);
2405 }
2406 self.endpoint_events.push_back(EndpointEventInner::Drained);
2409 }
2410 ConnTimer::Idle => {
2411 self.kill(ConnectionError::TimedOut);
2412 }
2413 ConnTimer::KeepAlive => {
2414 self.ping();
2415 }
2416 ConnTimer::KeyDiscard => {
2417 self.crypto_state.discard_temporary_keys();
2418 }
2419 ConnTimer::PushNewCid => {
2420 while let Some((path_id, when)) = self.next_cid_retirement() {
2421 if when > now {
2422 break;
2423 }
2424 match self.local_cid_state.get_mut(&path_id) {
2425 None => error!(%path_id, "No local CID state for path"),
2426 Some(cid_state) => {
2427 let num_new_cid = cid_state.on_cid_timeout().into();
2429 if !self.state.is_closed() {
2430 trace!(
2431 "push a new CID to peer RETIRE_PRIOR_TO field {}",
2432 cid_state.retire_prior_to()
2433 );
2434 self.endpoint_events.push_back(
2435 EndpointEventInner::NeedIdentifiers(
2436 path_id,
2437 now,
2438 num_new_cid,
2439 ),
2440 );
2441 }
2442 }
2443 }
2444 }
2445 }
2446 ConnTimer::NoAvailablePath => {
2447 if self.state.is_closed() || self.state.is_drained() {
2452 error!("no viable path timer fired, but connection already closing");
2455 } else {
2456 trace!("no viable path grace period expired, closing connection");
2457 let err = TransportError::NO_VIABLE_PATH(
2458 "last path abandoned, no new path opened",
2459 );
2460 self.close_common();
2461 self.set_close_timer(now);
2462 self.connection_close_pending = true;
2463 self.state.move_to_closed(err);
2464 }
2465 }
2466 ConnTimer::NatTraversalProbeRetry => {
2467 self.n0_nat_traversal.queue_retries(self.is_ipv6());
2468 if let Some(delay) =
2469 self.n0_nat_traversal.retry_delay(self.config.initial_rtt)
2470 {
2471 self.timers.set(
2472 Timer::Conn(ConnTimer::NatTraversalProbeRetry),
2473 now + delay,
2474 self.qlog.with_time(now),
2475 );
2476 trace!("re-queued NAT probes");
2477 } else {
2478 trace!("no more NAT probes remaining");
2479 }
2480 }
2481 },
2482 Timer::PerPath(path_id, timer) => {
2483 match timer {
2484 PathTimer::PathIdle => {
2485 if let Err(err) =
2486 self.close_path_inner(now, path_id, PathAbandonReason::TimedOut)
2487 {
2488 warn!(?err, "failed closing path");
2489 }
2490 }
2491
2492 PathTimer::PathKeepAlive => {
2493 self.ping_path(path_id).ok();
2494 }
2495 PathTimer::LossDetection => {
2496 self.on_loss_detection_timeout(now, path_id);
2497 self.qlog.emit_recovery_metrics(
2498 path_id,
2499 &mut self
2500 .paths
2501 .get_mut(&path_id)
2502 .expect("loss-detection timer fires only on live paths")
2503 .data,
2504 now,
2505 );
2506 }
2507 PathTimer::PathValidationFailed => {
2508 let Some(path) = self.paths.get_mut(&path_id) else {
2509 continue;
2510 };
2511 self.timers.stop(
2512 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2513 self.qlog.with_time(now),
2514 );
2515 debug!("path migration validation failed");
2516 if let Some((_, prev)) = path.prev.take() {
2517 path.data = prev;
2518 }
2519 path.data.reset_on_path_challenges();
2520 }
2521 PathTimer::PathChallengeLost => {
2522 let Some(path) = self.paths.get_mut(&path_id) else {
2523 continue;
2524 };
2525 trace!("path challenge deemed lost");
2526 path.data.pending_on_path_challenge = true;
2527 }
2528 PathTimer::AbandonFromValidation => {
2529 let Some(path) = self.paths.get_mut(&path_id) else {
2530 continue;
2531 };
2532 path.data.reset_on_path_challenges();
2533 self.timers.stop(
2534 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2535 self.qlog.with_time(now),
2536 );
2537 debug!("new path validation failed");
2538 if let Err(err) = self.close_path_inner(
2539 now,
2540 path_id,
2541 PathAbandonReason::ValidationFailed,
2542 ) {
2543 warn!(?err, "failed closing path");
2544 }
2545 }
2546 PathTimer::Pacing => {}
2547 PathTimer::MaxAckDelay => {
2548 self.spaces[SpaceId::Data]
2550 .for_path(path_id)
2551 .pending_acks
2552 .on_max_ack_delay_timeout()
2553 }
2554 PathTimer::PathDrained => {
2555 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2558 if let Some(local_cid_state) = self.local_cid_state.remove(&path_id) {
2559 debug_assert!(!self.state.is_drained()); let (min_seq, max_seq) = local_cid_state.active_seq();
2561 for seq in min_seq..=max_seq {
2562 self.endpoint_events.push_back(
2563 EndpointEventInner::RetireConnectionId(
2564 now, path_id, seq, false,
2565 ),
2566 );
2567 }
2568 }
2569 self.discard_path(path_id, now);
2570 }
2571 }
2572 }
2573 }
2574 }
2575 }
2576
2577 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2589 self.close_inner(
2590 now,
2591 Close::Application(frame::ApplicationClose { error_code, reason }),
2592 )
2593 }
2594
2595 fn close_inner(&mut self, now: Instant, reason: Close) {
2611 let was_closed = self.state.is_closed();
2612 if !was_closed {
2613 self.close_common();
2614 self.set_close_timer(now);
2615 self.connection_close_pending = true;
2616 self.state.move_to_closed_local(reason);
2617 }
2618 }
2619
2620 pub fn datagrams(&mut self) -> Datagrams<'_> {
2622 Datagrams { conn: self }
2623 }
2624
2625 pub fn stats(&mut self) -> ConnectionStats {
2627 let mut stats = self.partial_stats.clone();
2628
2629 for path_stats in self.path_stats.iter_stats() {
2630 stats += *path_stats;
2635 }
2636
2637 stats
2638 }
2639
2640 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2642 let path = self.paths.get(&path_id)?;
2643 let stats = self.path_stats.for_path(path_id);
2644 stats.rtt = path.data.rtt.get();
2645 stats.cwnd = path.data.congestion.window();
2646 stats.current_mtu = path.data.mtud.current_mtu();
2647 Some(*stats)
2648 }
2649
2650 pub fn ping(&mut self) {
2654 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2657 path_data.ping_pending = true;
2658 }
2659 }
2660
2661 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2665 let path_data = self.spaces[self.highest_space]
2666 .number_spaces
2667 .get_mut(&path)
2668 .ok_or(ClosedPath { _private: () })?;
2669 path_data.ping_pending = true;
2670 Ok(())
2671 }
2672
2673 pub fn force_key_update(&mut self) {
2677 if !self.state.is_established() {
2678 debug!("ignoring forced key update in illegal state");
2679 return;
2680 }
2681 if self.crypto_state.prev_crypto.is_some() {
2682 debug!("ignoring redundant forced key update");
2685 return;
2686 }
2687 self.crypto_state.update_keys(None, false);
2688 }
2689
2690 pub fn crypto_session(&self) -> &dyn crypto::Session {
2692 self.crypto_state.session.as_ref()
2693 }
2694
2695 pub fn is_handshaking(&self) -> bool {
2705 self.state.is_handshake()
2706 }
2707
2708 pub fn is_closed(&self) -> bool {
2719 self.state.is_closed()
2720 }
2721
2722 pub fn is_drained(&self) -> bool {
2727 self.state.is_drained()
2728 }
2729
2730 pub fn accepted_0rtt(&self) -> bool {
2734 self.crypto_state.accepted_0rtt
2735 }
2736
2737 pub fn has_0rtt(&self) -> bool {
2739 self.crypto_state.zero_rtt_enabled
2740 }
2741
2742 pub fn has_pending_retransmits(&self) -> bool {
2744 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2745 }
2746
2747 pub fn side(&self) -> Side {
2749 self.side.side()
2750 }
2751
2752 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2754 self.path(path_id)
2755 .map(|path_data| {
2756 path_data
2757 .last_observed_addr_report
2758 .as_ref()
2759 .map(|observed| observed.socket_addr())
2760 })
2761 .ok_or(ClosedPath { _private: () })
2762 }
2763
2764 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2766 self.path(path_id).map(|d| d.rtt.get())
2767 }
2768
2769 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2771 self.path(path_id).map(|d| d.congestion.as_ref())
2772 }
2773
2774 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2779 self.streams.set_max_concurrent(dir, count);
2780 let pending = &mut self.spaces[SpaceId::Data].pending;
2783 self.streams.queue_max_stream_id(pending);
2784 }
2785
2786 pub fn set_max_concurrent_paths(
2796 &mut self,
2797 now: Instant,
2798 count: NonZeroU32,
2799 ) -> Result<(), MultipathNotNegotiated> {
2800 if !self.is_multipath_negotiated() {
2801 return Err(MultipathNotNegotiated { _private: () });
2802 }
2803 self.max_concurrent_paths = count;
2804
2805 let in_use_count = self
2806 .local_max_path_id
2807 .next()
2808 .saturating_sub(self.abandoned_paths.len() as u32)
2809 .as_u32();
2810 let extra_needed = count.get().saturating_sub(in_use_count);
2811 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2812
2813 self.set_max_path_id(now, new_max_path_id);
2814
2815 Ok(())
2816 }
2817
2818 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2820 if max_path_id <= self.local_max_path_id {
2821 return;
2822 }
2823
2824 self.local_max_path_id = max_path_id;
2825 self.spaces[SpaceId::Data].pending.max_path_id = true;
2826
2827 self.issue_first_path_cids(now);
2828 }
2829
2830 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2836 self.streams.max_concurrent(dir)
2837 }
2838
2839 pub fn set_send_window(&mut self, send_window: u64) {
2841 self.streams.set_send_window(send_window);
2842 }
2843
2844 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2846 if self.streams.set_receive_window(receive_window) {
2847 self.spaces[SpaceId::Data].pending.max_data = true;
2848 }
2849 }
2850
2851 pub fn is_multipath_negotiated(&self) -> bool {
2856 !self.is_handshaking()
2857 && self.config.max_concurrent_multipath_paths.is_some()
2858 && self.peer_params.initial_max_path_id.is_some()
2859 }
2860
2861 fn on_ack_received(
2862 &mut self,
2863 now: Instant,
2864 space: SpaceId,
2865 ack: frame::Ack,
2866 ) -> Result<(), TransportError> {
2867 let path = PathId::ZERO;
2869 self.inner_on_ack_received(now, space, path, ack)
2870 }
2871
2872 fn on_path_ack_received(
2873 &mut self,
2874 now: Instant,
2875 space: SpaceId,
2876 path_ack: frame::PathAck,
2877 ) -> Result<(), TransportError> {
2878 let (ack, path) = path_ack.into_ack();
2879 self.inner_on_ack_received(now, space, path, ack)
2880 }
2881
2882 fn inner_on_ack_received(
2884 &mut self,
2885 now: Instant,
2886 space: SpaceId,
2887 path: PathId,
2888 ack: frame::Ack,
2889 ) -> Result<(), TransportError> {
2890 if !self.spaces[space].number_spaces.contains_key(&path) {
2891 if self.abandoned_paths.contains(&path) {
2892 trace!("silently ignoring PATH_ACK on discarded path");
2898 return Ok(());
2899 } else {
2900 return Err(TransportError::PROTOCOL_VIOLATION(
2901 "received PATH_ACK with path ID never used",
2902 ));
2903 }
2904 }
2905 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2906 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2907 }
2908 let new_largest_pn = {
2910 let space = &mut self.spaces[space].for_path(path);
2911 if space
2912 .largest_acked_packet_pn
2913 .is_none_or(|pn| ack.largest > pn)
2914 {
2915 space.largest_acked_packet_pn = Some(ack.largest);
2916 if let Some(info) = space.sent_packets.get(ack.largest) {
2917 space.largest_acked_packet_send_time = info.time_sent;
2921 }
2922 Some(ack.largest)
2923 } else {
2924 None
2925 }
2926 };
2927
2928 if self.detect_spurious_loss(&ack, space, path) {
2929 self.path_stats.for_path(path).spurious_congestion_events += 1;
2930 self.path_data_mut(path)
2931 .congestion
2932 .on_spurious_congestion_event();
2933 }
2934
2935 let mut newly_acked = ArrayRangeSet::new();
2937 for range in ack.iter() {
2938 self.spaces[space].for_path(path).check_ack(range.clone())?;
2939 for (pn, _) in self.spaces[space]
2940 .for_path(path)
2941 .sent_packets
2942 .iter_range(range)
2943 {
2944 newly_acked.insert_one(pn);
2945 }
2946 }
2947
2948 if newly_acked.is_empty() {
2949 return Ok(());
2950 }
2951
2952 let mut ack_eliciting_acked = false;
2953 for packet in newly_acked.elts() {
2954 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2955 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2956 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2962 pns.pending_acks.subtract_below(*acked_pn);
2963 }
2964 }
2965 ack_eliciting_acked |= info.ack_eliciting;
2966
2967 let path_data = self.path_data_mut(path);
2969 let mtu_updated = path_data.mtud.on_acked(space.kind(), packet, info.size);
2970 if mtu_updated {
2971 path_data
2972 .congestion
2973 .on_mtu_update(path_data.mtud.current_mtu());
2974 }
2975
2976 self.ack_frequency.on_acked(path, packet);
2978
2979 self.on_packet_acked(now, path, packet, info);
2980 }
2981 }
2982
2983 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet_pn;
2984 let path_data = self.path_data_mut(path);
2985 let app_limited = path_data.app_limited;
2986 let in_flight = path_data.in_flight.bytes;
2987
2988 path_data
2989 .congestion
2990 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2991
2992 if new_largest_pn.is_some() && ack_eliciting_acked {
2993 let ack_delay = if space != SpaceId::Data {
2994 Duration::from_micros(0)
2995 } else {
2996 cmp::min(
2997 self.ack_frequency.peer_max_ack_delay,
2998 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2999 )
3000 };
3001 let rtt = now.saturating_duration_since(
3002 self.spaces[space]
3003 .for_path(path)
3004 .largest_acked_packet_send_time,
3005 );
3006
3007 let next_pn = self.spaces[space].for_path(path).next_packet_number;
3008 let path_data = self.path_data_mut(path);
3009 path_data.rtt.update(ack_delay, rtt);
3011 if path_data.first_packet_after_rtt_sample.is_none() {
3012 path_data.first_packet_after_rtt_sample = Some((space.kind(), next_pn));
3013 }
3014 }
3015
3016 self.detect_lost_packets(now, space, path, true);
3018
3019 if self.peer_completed_handshake_address_validation() {
3024 self.path_data_mut(path).pto_count = 0;
3025 }
3026
3027 if self.path_data(path).sending_ecn {
3032 if let Some(ecn) = ack.ecn {
3033 if let Some(largest_sent_pn) = new_largest_pn {
3038 let sent = self.spaces[space]
3039 .for_path(path)
3040 .largest_acked_packet_send_time;
3041 self.process_ecn(
3042 now,
3043 space,
3044 path,
3045 newly_acked.len() as u64,
3046 ecn,
3047 sent,
3048 largest_sent_pn,
3049 );
3050 }
3051 } else {
3052 debug!("ECN not acknowledged by peer");
3054 self.path_data_mut(path).sending_ecn = false;
3055 }
3056 }
3057
3058 self.set_loss_detection_timer(now, path);
3059 Ok(())
3060 }
3061
3062 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
3063 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
3064
3065 if lost_packets.is_empty() {
3066 return false;
3067 }
3068
3069 for range in ack.iter() {
3070 let spurious_losses: Vec<u64> = lost_packets
3071 .iter_range(range.clone())
3072 .map(|(pn, _info)| pn)
3073 .collect();
3074
3075 for pn in spurious_losses {
3076 lost_packets.remove(pn);
3077 }
3078 }
3079
3080 lost_packets.is_empty()
3085 }
3086
3087 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
3092 let two_pto = 2 * self.path_data(path).rtt.pto_base();
3093
3094 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
3095 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
3096 }
3097
3098 fn process_ecn(
3100 &mut self,
3101 now: Instant,
3102 space: SpaceId,
3103 path: PathId,
3104 newly_acked_pn: u64,
3105 ecn: frame::EcnCounts,
3106 largest_sent_time: Instant,
3107 largest_sent_pn: u64,
3108 ) {
3109 match self.spaces[space]
3110 .for_path(path)
3111 .detect_ecn(newly_acked_pn, ecn)
3112 {
3113 Err(e) => {
3114 debug!("halting ECN due to verification failure: {}", e);
3115
3116 self.path_data_mut(path).sending_ecn = false;
3117 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
3120 }
3121 Ok(false) => {}
3122 Ok(true) => {
3123 self.path_stats.for_path(path).congestion_events += 1;
3124 self.path_data_mut(path).congestion.on_congestion_event(
3125 now,
3126 largest_sent_time,
3127 false,
3128 true,
3129 0,
3130 largest_sent_pn,
3131 );
3132 }
3133 }
3134 }
3135
3136 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, pn: u64, info: SentPacket) {
3139 let path = self.path_data_mut(path_id);
3140 let app_limited = path.app_limited;
3141 path.remove_in_flight(&info);
3142 if info.ack_eliciting && info.path_generation == path.generation() {
3143 let rtt = path.rtt;
3147 path.congestion
3148 .on_ack(now, info.time_sent, info.size.into(), pn, app_limited, &rtt);
3149 }
3150
3151 if let Some(retransmits) = info.retransmits.get() {
3153 for (id, _) in retransmits.reset_stream.iter() {
3154 self.streams.reset_acked(*id);
3155 }
3156 }
3157
3158 for frame in info.stream_frames {
3159 self.streams.received_ack_of(frame);
3160 }
3161 }
3162
3163 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceKind) {
3164 let start = if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) {
3165 now
3166 } else {
3167 self.crypto_state
3168 .prev_crypto
3169 .as_ref()
3170 .expect("no previous keys")
3171 .end_packet
3172 .as_ref()
3173 .expect("update not acknowledged yet")
3174 .1
3175 };
3176
3177 self.timers.set(
3179 Timer::Conn(ConnTimer::KeyDiscard),
3180 start + self.max_pto_for_space(space) * 3,
3181 self.qlog.with_time(now),
3182 );
3183 }
3184
3185 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
3198 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
3199 self.detect_lost_packets(now, pn_space, path_id, false);
3201 self.set_loss_detection_timer(now, path_id);
3202 return;
3203 }
3204
3205 let Some((_, space)) = self.pto_time_and_space(now, path_id) else {
3206 error!(%path_id, "PTO expired while unset");
3207 return;
3208 };
3209 trace!(
3210 in_flight = self.path_data(path_id).in_flight.bytes,
3211 count = self.path_data(path_id).pto_count,
3212 ?space,
3213 %path_id,
3214 "PTO fired"
3215 );
3216
3217 let count = match self.path_data(path_id).in_flight.ack_eliciting {
3218 0 => {
3221 debug_assert!(!self.peer_completed_handshake_address_validation());
3222 1
3223 }
3224 _ => 2,
3226 };
3227 let pns = self.spaces[space].for_path(path_id);
3228 pns.loss_probes = pns.loss_probes.saturating_add(count);
3229 let path_data = self.path_data_mut(path_id);
3230 path_data.pto_count = path_data.pto_count.saturating_add(1);
3231 self.set_loss_detection_timer(now, path_id);
3232 }
3233
3234 fn detect_lost_packets(
3251 &mut self,
3252 now: Instant,
3253 pn_space: SpaceId,
3254 path_id: PathId,
3255 due_to_ack: bool,
3256 ) {
3257 let mut lost_packets = Vec::<u64>::new();
3258 let mut lost_mtu_probe = None;
3259 let mut in_persistent_congestion = false;
3260 let mut size_of_lost_packets = 0u64;
3261 self.spaces[pn_space].for_path(path_id).loss_time = None;
3262
3263 let path = self.path_data(path_id);
3266 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3267 let loss_delay = path
3268 .rtt
3269 .conservative()
3270 .mul_f32(self.config.time_threshold)
3271 .max(TIMER_GRANULARITY);
3272 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
3273
3274 let largest_acked_packet_pn = self.spaces[pn_space]
3275 .for_path(path_id)
3276 .largest_acked_packet_pn
3277 .expect("detect_lost_packets only to be called if path received at least one ACK");
3278 let packet_threshold = self.config.packet_threshold as u64;
3279
3280 let congestion_period = self
3284 .pto(SpaceKind::Data, path_id)
3285 .saturating_mul(self.config.persistent_congestion_threshold);
3286 let mut persistent_congestion_start: Option<Instant> = None;
3287 let mut prev_packet = None;
3288 let space = self.spaces[pn_space].for_path(path_id);
3289
3290 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet_pn) {
3291 if prev_packet != Some(packet.wrapping_sub(1)) {
3292 persistent_congestion_start = None;
3294 }
3295
3296 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
3300 if packet_too_old || largest_acked_packet_pn >= packet + packet_threshold {
3301 if Some(packet) == in_flight_mtu_probe {
3303 lost_mtu_probe = in_flight_mtu_probe;
3306 } else {
3307 lost_packets.push(packet);
3308 size_of_lost_packets += info.size as u64;
3309 if info.ack_eliciting && due_to_ack {
3310 match persistent_congestion_start {
3311 Some(start) if info.time_sent - start > congestion_period => {
3314 in_persistent_congestion = true;
3315 }
3316 None if first_packet_after_rtt_sample
3318 .is_some_and(|x| x < (pn_space.kind(), packet)) =>
3319 {
3320 persistent_congestion_start = Some(info.time_sent);
3321 }
3322 _ => {}
3323 }
3324 }
3325 }
3326 } else {
3327 if space.loss_time.is_none() {
3329 space.loss_time = Some(info.time_sent + loss_delay);
3332 }
3333 persistent_congestion_start = None;
3334 }
3335
3336 prev_packet = Some(packet);
3337 }
3338
3339 self.handle_lost_packets(
3340 pn_space,
3341 path_id,
3342 now,
3343 lost_packets,
3344 lost_mtu_probe,
3345 loss_delay,
3346 in_persistent_congestion,
3347 size_of_lost_packets,
3348 );
3349 }
3350
3351 fn discard_path(&mut self, path_id: PathId, now: Instant) {
3353 trace!(%path_id, "dropping path state");
3354 let path = self.path_data(path_id);
3355 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3356
3357 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
3359 .for_path(path_id)
3360 .sent_packets
3361 .iter()
3362 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
3363 .map(|(pn, info)| {
3364 size_of_lost_packets += info.size as u64;
3365 pn
3366 })
3367 .collect();
3368
3369 if !lost_pns.is_empty() {
3370 trace!(
3371 %path_id,
3372 count = lost_pns.len(),
3373 lost_bytes = size_of_lost_packets,
3374 "packets lost on path abandon"
3375 );
3376 self.handle_lost_packets(
3377 SpaceId::Data,
3378 path_id,
3379 now,
3380 lost_pns,
3381 in_flight_mtu_probe,
3382 Duration::ZERO,
3383 false,
3384 size_of_lost_packets,
3385 );
3386 }
3387 let path_stats = self.path_stats.discard(&path_id);
3390 self.partial_stats += path_stats;
3391 self.paths.remove(&path_id);
3392 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
3393
3394 self.events.push_back(
3395 PathEvent::Discarded {
3396 id: path_id,
3397 path_stats: Box::new(path_stats),
3398 }
3399 .into(),
3400 );
3401 }
3402
3403 fn handle_lost_packets(
3404 &mut self,
3405 pn_space: SpaceId,
3406 path_id: PathId,
3407 now: Instant,
3408 lost_packets: Vec<u64>,
3409 lost_mtu_probe: Option<u64>,
3410 loss_delay: Duration,
3411 in_persistent_congestion: bool,
3412 size_of_lost_packets: u64,
3413 ) {
3414 debug_assert!(lost_packets.is_sorted(), "lost_packets must be sorted");
3415
3416 self.drain_lost_packets(now, pn_space, path_id);
3417
3418 if let Some(largest_lost) = lost_packets.last().cloned() {
3420 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
3421 let largest_lost_sent = self.spaces[pn_space]
3422 .for_path(path_id)
3423 .sent_packets
3424 .get(largest_lost)
3425 .unwrap()
3426 .time_sent;
3427 let path_stats = self.path_stats.for_path(path_id);
3428 path_stats.lost_packets += lost_packets.len() as u64;
3429 path_stats.lost_bytes += size_of_lost_packets;
3430 trace!(
3431 %path_id,
3432 count = lost_packets.len(),
3433 lost_bytes = size_of_lost_packets,
3434 "packets lost",
3435 );
3436
3437 for &packet in &lost_packets {
3438 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
3439 continue;
3440 };
3441 self.qlog
3442 .emit_packet_lost(packet, &info, loss_delay, pn_space.kind(), now);
3443 self.paths
3444 .get_mut(&path_id)
3445 .unwrap()
3446 .remove_in_flight(&info);
3447
3448 for frame in info.stream_frames {
3449 self.streams.retransmit(frame);
3450 }
3451 self.spaces[pn_space].pending |= info.retransmits;
3452 let path = self.path_data_mut(path_id);
3453 path.mtud.on_non_probe_lost(packet, info.size);
3454 path.congestion.on_packet_lost(info.size, packet, now);
3455
3456 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
3457 packet,
3458 LostPacket {
3459 time_sent: info.time_sent,
3460 },
3461 );
3462 }
3463
3464 let path = self.path_data_mut(path_id);
3465 if path.mtud.black_hole_detected(now) {
3466 path.congestion.on_mtu_update(path.mtud.current_mtu());
3467 if let Some(max_datagram_size) = self.datagrams().max_size()
3468 && self.datagrams.drop_oversized(max_datagram_size)
3469 && self.datagrams.send_blocked
3470 {
3471 self.datagrams.send_blocked = false;
3472 self.events.push_back(Event::DatagramsUnblocked);
3473 }
3474 self.path_stats.for_path(path_id).black_holes_detected += 1;
3475 }
3476
3477 let lost_ack_eliciting =
3479 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
3480
3481 if lost_ack_eliciting {
3482 self.path_stats.for_path(path_id).congestion_events += 1;
3483 self.path_data_mut(path_id).congestion.on_congestion_event(
3484 now,
3485 largest_lost_sent,
3486 in_persistent_congestion,
3487 false,
3488 size_of_lost_packets,
3489 largest_lost,
3490 );
3491 }
3492 }
3493
3494 if let Some(packet) = lost_mtu_probe {
3496 let info = self.spaces[SpaceId::Data]
3497 .for_path(path_id)
3498 .take(packet)
3499 .unwrap(); self.paths
3502 .get_mut(&path_id)
3503 .unwrap()
3504 .remove_in_flight(&info);
3505 self.path_data_mut(path_id).mtud.on_probe_lost();
3506 self.path_stats.for_path(path_id).lost_plpmtud_probes += 1;
3507 }
3508 }
3509
3510 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
3516 SpaceId::iter()
3517 .filter_map(|id| {
3518 self.spaces[id]
3519 .number_spaces
3520 .get(&path_id)
3521 .and_then(|pns| pns.loss_time)
3522 .map(|time| (time, id))
3523 })
3524 .min_by_key(|&(time, _)| time)
3525 }
3526
3527 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
3535 let path = self.path(path_id)?;
3536 let pto_count = path.pto_count;
3537
3538 let max_interval = if path.rtt.get() > SLOW_RTT_THRESHOLD {
3540 (path.rtt.get() * 3) / 2
3542 } else if let Some(idle) = path.idle_timeout.or(self.idle_timeout)
3543 && idle <= MIN_IDLE_FOR_FAST_PTO
3544 {
3545 MAX_PTO_FAST_INTERVAL
3548 } else {
3549 MAX_PTO_INTERVAL
3551 };
3552
3553 if path_id == PathId::ZERO
3554 && path.in_flight.ack_eliciting == 0
3555 && !self.peer_completed_handshake_address_validation()
3556 {
3557 let space = match self.highest_space {
3563 SpaceKind::Handshake => SpaceId::Handshake,
3564 _ => SpaceId::Initial,
3565 };
3566
3567 let backoff = 2u32.pow(path.pto_count.min(MAX_BACKOFF_EXPONENT));
3568 let duration = path.rtt.pto_base() * backoff;
3569 let duration = duration.min(max_interval);
3570 return Some((now + duration, space));
3571 }
3572
3573 let mut result = None;
3574 for space in SpaceId::iter() {
3575 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
3576 continue;
3577 };
3578
3579 if space == SpaceId::Data && !self.is_handshake_confirmed() {
3580 continue;
3584 }
3585
3586 if !pns.has_in_flight() {
3587 continue;
3588 }
3589
3590 let duration = {
3595 let max_ack_delay = if space == SpaceId::Data {
3596 self.ack_frequency.max_ack_delay_for_pto()
3597 } else {
3598 Duration::ZERO
3599 };
3600 let pto_base = path.rtt.pto_base() + max_ack_delay;
3601 let mut duration = pto_base;
3602 for i in 1..=pto_count {
3603 let exponential_duration = pto_base * 2u32.pow(i.min(MAX_BACKOFF_EXPONENT));
3604 let max_duration = duration + max_interval;
3605 duration = exponential_duration.min(max_duration);
3606 }
3607 duration
3608 };
3609
3610 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3611 continue;
3612 };
3613 let pto = last_ack_eliciting + duration;
3616 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3617 if path.anti_amplification_blocked(1) {
3618 continue;
3620 }
3621 if path.in_flight.ack_eliciting == 0 {
3622 continue;
3624 }
3625 result = Some((pto, space));
3626 }
3627 }
3628 result
3629 }
3630
3631 fn peer_completed_handshake_address_validation(&self) -> bool {
3633 if self.side.is_server() || self.state.is_closed() {
3634 return true;
3635 }
3636 self.spaces[SpaceId::Handshake]
3640 .path_space(PathId::ZERO)
3641 .and_then(|pns| pns.largest_acked_packet_pn)
3642 .is_some()
3643 || self.spaces[SpaceId::Data]
3644 .path_space(PathId::ZERO)
3645 .and_then(|pns| pns.largest_acked_packet_pn)
3646 .is_some()
3647 || (self.crypto_state.has_keys(EncryptionLevel::OneRtt)
3648 && !self.crypto_state.has_keys(EncryptionLevel::Handshake))
3649 }
3650
3651 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3659 if self.state.is_closed() {
3660 return;
3664 }
3665
3666 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3667 self.timers.set(
3669 Timer::PerPath(path_id, PathTimer::LossDetection),
3670 loss_time,
3671 self.qlog.with_time(now),
3672 );
3673 return;
3674 }
3675
3676 if !self.abandoned_paths.contains(&path_id)
3679 && let Some((timeout, _)) = self.pto_time_and_space(now, path_id)
3680 {
3681 self.timers.set(
3682 Timer::PerPath(path_id, PathTimer::LossDetection),
3683 timeout,
3684 self.qlog.with_time(now),
3685 );
3686 } else {
3687 self.timers.stop(
3688 Timer::PerPath(path_id, PathTimer::LossDetection),
3689 self.qlog.with_time(now),
3690 );
3691 }
3692 }
3693
3694 fn max_pto_for_space(&self, space: SpaceKind) -> Duration {
3698 self.paths
3699 .keys()
3700 .map(|path_id| self.pto(space, *path_id))
3701 .max()
3702 .unwrap_or_else(|| {
3703 let rtt = self.config.initial_rtt;
3707 let max_ack_delay = match space {
3708 SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3709 SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3710 };
3711 rtt + cmp::max(4 * (rtt / 2), TIMER_GRANULARITY) + max_ack_delay
3712 })
3713 }
3714
3715 fn pto(&self, space: SpaceKind, path_id: PathId) -> Duration {
3720 let max_ack_delay = match space {
3721 SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3722 SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3723 };
3724 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3725 }
3726
3727 fn on_packet_authenticated(
3728 &mut self,
3729 now: Instant,
3730 space_id: SpaceKind,
3731 path_id: PathId,
3732 ecn: Option<EcnCodepoint>,
3733 packet_number: Option<u64>,
3734 spin: bool,
3735 is_1rtt: bool,
3736 remote: &FourTuple,
3737 ) {
3738 let is_on_path = self
3745 .path_data(path_id)
3746 .network_path
3747 .is_probably_same_path(remote);
3748
3749 self.total_authed_packets += 1;
3750 self.reset_keep_alive(path_id, now);
3751 self.reset_idle_timeout(now, space_id, path_id);
3752 self.path_data_mut(path_id).permit_idle_reset = true;
3753
3754 if is_on_path {
3757 self.receiving_ecn |= ecn.is_some();
3758 if let Some(x) = ecn {
3759 let space = &mut self.spaces[space_id];
3760 space.for_path(path_id).ecn_counters += x;
3761
3762 if x.is_ce() {
3763 space
3764 .for_path(path_id)
3765 .pending_acks
3766 .set_immediate_ack_required();
3767 }
3768 }
3769 }
3770
3771 let Some(packet_number) = packet_number else {
3772 return;
3773 };
3774 match &self.side {
3775 ConnectionSide::Client { .. } => {
3776 if space_id == SpaceKind::Handshake
3780 && let Some(hs) = self.state.as_handshake_mut()
3781 {
3782 hs.allow_server_migration = false;
3783 }
3784 }
3785 ConnectionSide::Server { .. } => {
3786 if self.crypto_state.has_keys(EncryptionLevel::Initial)
3787 && space_id == SpaceKind::Handshake
3788 {
3789 self.discard_space(now, SpaceKind::Initial);
3791 }
3792 if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) && is_1rtt {
3793 self.set_key_discard_timer(now, space_id)
3795 }
3796 }
3797 }
3798 let space = self.spaces[space_id].for_path(path_id);
3799
3800 space.pending_acks.insert_one(packet_number, now);
3801 if packet_number >= space.largest_received_packet_number.unwrap_or_default() {
3802 space.largest_received_packet_number = Some(packet_number);
3803
3804 if is_on_path {
3806 self.spin = self.side.is_client() ^ spin;
3807 }
3808 }
3809 }
3810
3811 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceKind, path_id: PathId) {
3816 if let Some(timeout) = self.idle_timeout {
3818 if self.state.is_closed() {
3819 self.timers
3820 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3821 } else {
3822 let dt = cmp::max(timeout, 3 * self.max_pto_for_space(space));
3823 self.timers.set(
3824 Timer::Conn(ConnTimer::Idle),
3825 now + dt,
3826 self.qlog.with_time(now),
3827 );
3828 }
3829 }
3830
3831 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3833 if self.state.is_closed() {
3834 self.timers.stop(
3835 Timer::PerPath(path_id, PathTimer::PathIdle),
3836 self.qlog.with_time(now),
3837 );
3838 } else {
3839 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3840 self.timers.set(
3841 Timer::PerPath(path_id, PathTimer::PathIdle),
3842 now + dt,
3843 self.qlog.with_time(now),
3844 );
3845 }
3846 }
3847 }
3848
3849 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3851 if !self.state.is_established() {
3852 return;
3853 }
3854
3855 if let Some(interval) = self.config.keep_alive_interval {
3856 self.timers.set(
3857 Timer::Conn(ConnTimer::KeepAlive),
3858 now + interval,
3859 self.qlog.with_time(now),
3860 );
3861 }
3862
3863 if let Some(interval) = self.path_data(path_id).keep_alive {
3864 self.timers.set(
3865 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3866 now + interval,
3867 self.qlog.with_time(now),
3868 );
3869 }
3870 }
3871
3872 fn reset_cid_retirement(&mut self, now: Instant) {
3874 if let Some((_path, t)) = self.next_cid_retirement() {
3875 self.timers.set(
3876 Timer::Conn(ConnTimer::PushNewCid),
3877 t,
3878 self.qlog.with_time(now),
3879 );
3880 }
3881 }
3882
3883 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3885 self.local_cid_state
3886 .iter()
3887 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3888 .min_by_key(|(_path_id, timeout)| *timeout)
3889 }
3890
3891 pub(crate) fn handle_first_packet(
3896 &mut self,
3897 now: Instant,
3898 network_path: FourTuple,
3899 ecn: Option<EcnCodepoint>,
3900 packet_number: u64,
3901 packet: InitialPacket,
3902 remaining: Option<BytesMut>,
3903 ) -> Result<(), ConnectionError> {
3904 let span = trace_span!("first recv");
3905 let _guard = span.enter();
3906 debug_assert!(self.side.is_server());
3907 let len = packet.header_data.len() + packet.payload.len();
3908 let path_id = PathId::ZERO;
3909 self.path_data_mut(path_id).total_recvd = len as u64;
3910
3911 if let Some(hs) = self.state.as_handshake_mut() {
3912 hs.expected_token = packet.header.token.clone();
3913 } else {
3914 unreachable!("first packet must be delivered in Handshake state");
3915 }
3916
3917 self.on_packet_authenticated(
3919 now,
3920 SpaceKind::Initial,
3921 path_id,
3922 ecn,
3923 Some(packet_number),
3924 false,
3925 false,
3926 &network_path,
3927 );
3928
3929 let packet: Packet = packet.into();
3930
3931 let mut qlog = QlogRecvPacket::new(len);
3932 qlog.header(&packet.header, Some(packet_number), path_id);
3933
3934 self.process_decrypted_packet(
3935 now,
3936 network_path,
3937 path_id,
3938 Some(packet_number),
3939 packet,
3940 &mut qlog,
3941 )?;
3942 self.qlog.emit_packet_received(qlog, now);
3943 if let Some(data) = remaining {
3944 self.handle_coalesced(now, network_path, path_id, ecn, data);
3945 }
3946
3947 self.qlog.emit_recovery_metrics(
3948 path_id,
3949 &mut self
3950 .paths
3951 .get_mut(&path_id)
3952 .expect("path_id was supplied by the caller for an active path")
3953 .data,
3954 now,
3955 );
3956
3957 Ok(())
3958 }
3959
3960 fn init_0rtt(&mut self, now: Instant) {
3961 let Some((header, packet)) = self.crypto_state.session.early_crypto() else {
3962 return;
3963 };
3964 if self.side.is_client() {
3965 match self.crypto_state.session.transport_parameters() {
3966 Ok(params) => {
3967 let params = params
3968 .expect("crypto layer didn't supply transport parameters with ticket");
3969 let params = TransportParameters {
3971 initial_src_cid: None,
3972 original_dst_cid: None,
3973 preferred_address: None,
3974 retry_src_cid: None,
3975 stateless_reset_token: None,
3976 min_ack_delay: None,
3977 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3978 max_ack_delay: TransportParameters::default().max_ack_delay,
3979 initial_max_path_id: None,
3980 ..params
3981 };
3982 self.set_peer_params(params);
3983 self.qlog.emit_peer_transport_params_restored(self, now);
3984 }
3985 Err(e) => {
3986 error!("session ticket has malformed transport parameters: {}", e);
3987 return;
3988 }
3989 }
3990 }
3991 trace!("0-RTT enabled");
3992 self.crypto_state.enable_zero_rtt(header, packet);
3993 }
3994
3995 fn read_crypto(
3996 &mut self,
3997 space: SpaceId,
3998 crypto: &frame::Crypto,
3999 payload_len: usize,
4000 ) -> Result<(), TransportError> {
4001 let expected = if !self.state.is_handshake() {
4002 SpaceId::Data
4003 } else if self.highest_space == SpaceKind::Initial {
4004 SpaceId::Initial
4005 } else {
4006 SpaceId::Handshake
4009 };
4010 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
4014
4015 let end = crypto.offset + crypto.data.len() as u64;
4016 if space < expected
4017 && end
4018 > self.crypto_state.spaces[space.kind()]
4019 .crypto_stream
4020 .bytes_read()
4021 {
4022 warn!(
4023 "received new {:?} CRYPTO data when expecting {:?}",
4024 space, expected
4025 );
4026 return Err(TransportError::PROTOCOL_VIOLATION(
4027 "new data at unexpected encryption level",
4028 ));
4029 }
4030
4031 let crypto_space = &mut self.crypto_state.spaces[space.kind()];
4032 let max = end.saturating_sub(crypto_space.crypto_stream.bytes_read());
4033 if max > self.config.crypto_buffer_size as u64 {
4034 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
4035 }
4036
4037 crypto_space
4038 .crypto_stream
4039 .insert(crypto.offset, crypto.data.clone(), payload_len);
4040 while let Some(chunk) = crypto_space.crypto_stream.read(usize::MAX, true) {
4041 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
4042 if self.crypto_state.session.read_handshake(&chunk.bytes)? {
4043 self.events.push_back(Event::HandshakeDataReady);
4044 }
4045 }
4046
4047 Ok(())
4048 }
4049
4050 fn write_crypto(&mut self) {
4051 loop {
4052 let space = self.highest_space;
4053 let mut outgoing = Vec::new();
4054 if let Some(crypto) = self.crypto_state.session.write_handshake(&mut outgoing) {
4055 match space {
4056 SpaceKind::Initial => {
4057 self.upgrade_crypto(SpaceKind::Handshake, crypto);
4058 }
4059 SpaceKind::Handshake => {
4060 self.upgrade_crypto(SpaceKind::Data, crypto);
4061 }
4062 SpaceKind::Data => unreachable!("got updated secrets during 1-RTT"),
4063 }
4064 }
4065 if outgoing.is_empty() {
4066 if space == self.highest_space {
4067 break;
4068 } else {
4069 continue;
4071 }
4072 }
4073 let offset = self.crypto_state.spaces[space].crypto_offset;
4074 let outgoing = Bytes::from(outgoing);
4075 if let Some(hs) = self.state.as_handshake_mut()
4076 && space == SpaceKind::Initial
4077 && offset == 0
4078 && self.side.is_client()
4079 {
4080 hs.client_hello = Some(outgoing.clone());
4081 }
4082 self.crypto_state.spaces[space].crypto_offset += outgoing.len() as u64;
4083 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
4084 self.spaces[space].pending.crypto.push_back(frame::Crypto {
4085 offset,
4086 data: outgoing,
4087 });
4088 }
4089 }
4090
4091 fn upgrade_crypto(&mut self, space: SpaceKind, crypto: Keys) {
4093 debug_assert!(
4094 !self.crypto_state.has_keys(space.encryption_level()),
4095 "already reached packet space {space:?}"
4096 );
4097 trace!("{:?} keys ready", space);
4098 if space == SpaceKind::Data {
4099 self.crypto_state.next_crypto = Some(
4101 self.crypto_state
4102 .session
4103 .next_1rtt_keys()
4104 .expect("handshake should be complete"),
4105 );
4106 }
4107
4108 self.crypto_state.spaces[space].keys = Some(crypto);
4109 debug_assert!(space > self.highest_space);
4110 self.highest_space = space;
4111 if space == SpaceKind::Data && self.side.is_client() {
4112 self.crypto_state.discard_zero_rtt();
4114 }
4115 }
4116
4117 fn discard_space(&mut self, now: Instant, space: SpaceKind) {
4118 debug_assert!(space != SpaceKind::Data);
4119 trace!("discarding {:?} keys", space);
4120 if space == SpaceKind::Initial {
4121 if let ConnectionSide::Client { token, .. } = &mut self.side {
4123 *token = Bytes::new();
4124 }
4125 }
4126 self.crypto_state.spaces[space].keys = None;
4127 let space = &mut self.spaces[space];
4128 let pns = space.for_path(PathId::ZERO);
4129 pns.time_of_last_ack_eliciting_packet = None;
4130 pns.loss_time = None;
4131 pns.loss_probes = 0;
4132 let sent_packets = mem::take(&mut pns.sent_packets);
4133 let path = self
4134 .paths
4135 .get_mut(&PathId::ZERO)
4136 .expect("PathId::ZERO is alive while Initial/Handshake spaces exist");
4137 for (_, packet) in sent_packets.into_iter() {
4138 path.data.remove_in_flight(&packet);
4139 }
4140
4141 self.set_loss_detection_timer(now, PathId::ZERO)
4142 }
4143
4144 fn handle_coalesced(
4145 &mut self,
4146 now: Instant,
4147 network_path: FourTuple,
4148 path_id: PathId,
4149 ecn: Option<EcnCodepoint>,
4150 data: BytesMut,
4151 ) {
4152 self.path_data_mut(path_id)
4153 .inc_total_recvd(data.len() as u64);
4154 let mut remaining = Some(data);
4155 let cid_len = self
4156 .local_cid_state
4157 .values()
4158 .map(|cid_state| cid_state.cid_len())
4159 .next()
4160 .expect("one cid_state must exist");
4161 while let Some(data) = remaining {
4162 match PartialDecode::new(
4163 data,
4164 &FixedLengthConnectionIdParser::new(cid_len),
4165 &[self.version],
4166 self.endpoint_config.grease_quic_bit,
4167 ) {
4168 Ok((partial_decode, rest)) => {
4169 remaining = rest;
4170 self.handle_decode(now, network_path, path_id, ecn, partial_decode);
4171 }
4172 Err(e) => {
4173 trace!("malformed header: {}", e);
4174 return;
4175 }
4176 }
4177 }
4178 }
4179
4180 fn handle_decode(
4186 &mut self,
4187 now: Instant,
4188 network_path: FourTuple,
4189 path_id: PathId,
4190 ecn: Option<EcnCodepoint>,
4191 partial_decode: PartialDecode,
4192 ) {
4193 let qlog = QlogRecvPacket::new(partial_decode.len());
4194 if let Some(decoded) = self
4195 .crypto_state
4196 .unprotect_header(partial_decode, self.peer_params.stateless_reset_token)
4197 {
4198 self.handle_packet(
4199 now,
4200 network_path,
4201 path_id,
4202 ecn,
4203 decoded.packet,
4204 decoded.stateless_reset,
4205 qlog,
4206 );
4207 }
4208 }
4209
4210 fn handle_packet(
4217 &mut self,
4218 now: Instant,
4219 network_path: FourTuple,
4220 path_id: PathId,
4221 ecn: Option<EcnCodepoint>,
4222 packet: Option<Packet>,
4223 stateless_reset: bool,
4224 mut qlog: QlogRecvPacket,
4225 ) {
4226 self.path_stats.for_path(path_id).udp_rx.ios += 1;
4227
4228 if let Some(ref packet) = packet {
4229 trace!(
4230 "got {:?} packet ({} bytes) from {} using id {}",
4231 packet.header.space(),
4232 packet.payload.len() + packet.header_data.len(),
4233 network_path,
4234 packet.header.dst_cid(),
4235 );
4236 }
4237
4238 let was_closed = self.state.is_closed();
4239 let was_drained = self.state.is_drained();
4240
4241 let decrypted = match packet {
4243 None => Err(None),
4244 Some(mut packet) => self
4245 .decrypt_packet(now, path_id, &mut packet)
4246 .map(move |number| (packet, number)),
4247 };
4248 let result = match decrypted {
4249 _ if stateless_reset => {
4250 debug!("got stateless reset");
4251 Err(ConnectionError::Reset)
4252 }
4253 Err(Some(e)) => {
4254 warn!("illegal packet: {}", e);
4255 Err(e.into())
4256 }
4257 Err(None) => {
4258 debug!("failed to authenticate packet");
4259 self.authentication_failures += 1;
4260 let integrity_limit = self
4261 .crypto_state
4262 .integrity_limit(self.highest_space)
4263 .unwrap();
4264 if self.authentication_failures > integrity_limit {
4265 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
4266 } else {
4267 return;
4268 }
4269 }
4270 Ok((packet, pn)) => {
4271 qlog.header(&packet.header, pn, path_id);
4273 let span = match pn {
4274 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
4275 None => trace_span!("recv", space = ?packet.header.space()),
4276 };
4277 let _guard = span.enter();
4278
4279 if self.is_handshaking()
4287 && self
4288 .path(path_id)
4289 .map(|path_data| {
4290 !path_data.network_path.is_probably_same_path(&network_path)
4291 })
4292 .unwrap_or(false)
4293 {
4294 if let Some(hs) = self.state.as_handshake()
4295 && hs.allow_server_migration
4296 {
4297 trace!(
4298 %network_path,
4299 prev = %self.path_data(path_id).network_path,
4300 "server migrated to new remote",
4301 );
4302 self.path_data_mut(path_id).network_path = network_path;
4303 self.qlog.emit_tuple_assigned(path_id, network_path, now);
4304 } else {
4305 debug!(
4306 recv_path = %network_path,
4307 expected_path = %self.path_data_mut(path_id).network_path,
4308 "discarding packet with unexpected remote during handshake",
4309 );
4310 return;
4311 }
4312 }
4313
4314 let dedup = self.spaces[packet.header.space()]
4315 .path_space_mut(path_id)
4316 .map(|pns| &mut pns.dedup);
4317 if pn.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
4318 debug!("discarding possible duplicate packet");
4319 self.qlog.emit_packet_received(qlog, now);
4320 return;
4321 } else if self.state.is_handshake() && packet.header.is_short() {
4322 trace!("dropping short packet during handshake");
4324 self.qlog.emit_packet_received(qlog, now);
4325 return;
4326 } else {
4327 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header
4328 && let Some(hs) = self.state.as_handshake()
4329 && self.side.is_server()
4330 && token != &hs.expected_token
4331 {
4332 warn!("discarding Initial with invalid retry token");
4336 self.qlog.emit_packet_received(qlog, now);
4337 return;
4338 }
4339
4340 if !self.state.is_closed() {
4341 let spin = match packet.header {
4342 Header::Short { spin, .. } => spin,
4343 _ => false,
4344 };
4345
4346 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
4347 self.ensure_path(path_id, network_path, now, pn);
4349 }
4350 if self.paths.contains_key(&path_id) {
4351 self.on_packet_authenticated(
4352 now,
4353 packet.header.space(),
4354 path_id,
4355 ecn,
4356 pn,
4357 spin,
4358 packet.header.is_1rtt(),
4359 &network_path,
4360 );
4361 }
4362 }
4363
4364 let res = self.process_decrypted_packet(
4365 now,
4366 network_path,
4367 path_id,
4368 pn,
4369 packet,
4370 &mut qlog,
4371 );
4372
4373 self.qlog.emit_packet_received(qlog, now);
4374 res
4375 }
4376 }
4377 };
4378
4379 if let Err(conn_err) = result {
4381 match conn_err {
4382 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
4383 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
4384 ConnectionError::Reset
4385 | ConnectionError::TransportError(TransportError {
4386 code: TransportErrorCode::AEAD_LIMIT_REACHED,
4387 ..
4388 }) => {
4389 let was_draining = self.state.move_to_drained(Some(conn_err));
4390 if !was_draining {
4391 self.endpoint_events.push_back(EndpointEventInner::Draining);
4392 }
4393 }
4394 ConnectionError::TimedOut => {
4395 unreachable!("timeouts aren't generated by packet processing");
4396 }
4397 ConnectionError::TransportError(err) => {
4398 debug!("closing connection due to transport error: {}", err);
4399 self.state.move_to_closed(err);
4400 }
4401 ConnectionError::VersionMismatch => {
4402 self.state.move_to_draining(Some(conn_err));
4403 self.endpoint_events.push_back(EndpointEventInner::Draining);
4404 }
4405 ConnectionError::LocallyClosed => {
4406 unreachable!("LocallyClosed isn't generated by packet processing");
4407 }
4408 ConnectionError::CidsExhausted => {
4409 unreachable!("CidsExhausted isn't generated by packet processing");
4410 }
4411 };
4412 }
4413
4414 if !was_closed && self.state.is_closed() {
4415 self.close_common();
4416 if !self.state.is_drained() {
4417 self.set_close_timer(now);
4418 }
4419 }
4420 if !was_drained && self.state.is_drained() {
4421 self.endpoint_events.push_back(EndpointEventInner::Drained);
4422 self.timers
4425 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
4426 }
4427
4428 if matches!(self.state.as_type(), StateType::Closed) {
4435 if self
4453 .paths
4454 .get(&path_id)
4455 .map(|p| p.data.validated && p.data.network_path == network_path)
4456 .unwrap_or(false)
4457 {
4458 self.connection_close_pending = true;
4459 }
4460 }
4461 }
4462
4463 fn process_decrypted_packet(
4464 &mut self,
4465 now: Instant,
4466 network_path: FourTuple,
4467 path_id: PathId,
4468 number: Option<u64>,
4469 packet: Packet,
4470 qlog: &mut QlogRecvPacket,
4471 ) -> Result<(), ConnectionError> {
4472 if !self.paths.contains_key(&path_id) {
4473 trace!(%path_id, ?number, "discarding packet for unknown path");
4477 return Ok(());
4478 }
4479 let state = match self.state.as_type() {
4480 StateType::Established => {
4481 match packet.header.space() {
4482 SpaceKind::Data => self.process_payload(
4483 now,
4484 network_path,
4485 path_id,
4486 number.unwrap(),
4487 packet,
4488 qlog,
4489 )?,
4490 _ if packet.header.has_frames() => {
4491 self.process_early_payload(now, path_id, packet, qlog)?
4492 }
4493 _ => {
4494 trace!("discarding unexpected pre-handshake packet");
4495 }
4496 }
4497 return Ok(());
4498 }
4499 StateType::Closed => {
4500 for result in frame::Iter::new(packet.payload.freeze())? {
4501 let frame = match result {
4502 Ok(frame) => frame,
4503 Err(err) => {
4504 debug!("frame decoding error: {err:?}");
4505 continue;
4506 }
4507 };
4508 qlog.frame(&frame);
4509
4510 if let Frame::Padding = frame {
4511 continue;
4512 };
4513
4514 trace!(?frame, "processing frame in closed state");
4515
4516 self.path_stats
4517 .for_path(path_id)
4518 .frame_rx
4519 .record(frame.ty());
4520
4521 if let Frame::Close(_error) = frame {
4522 self.state.move_to_draining(None);
4523 self.endpoint_events.push_back(EndpointEventInner::Draining);
4524 break;
4525 }
4526 }
4527 return Ok(());
4528 }
4529 StateType::Draining | StateType::Drained => return Ok(()),
4530 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
4531 };
4532
4533 match packet.header {
4534 Header::Retry {
4535 src_cid: remote_cid,
4536 ..
4537 } => {
4538 debug_assert_eq!(path_id, PathId::ZERO);
4539 if self.side.is_server() {
4540 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
4541 }
4542
4543 let is_valid_retry = self
4544 .remote_cids
4545 .get(&path_id)
4546 .map(|cids| cids.active())
4547 .map(|orig_dst_cid| {
4548 self.crypto_state.session.is_valid_retry(
4549 orig_dst_cid,
4550 &packet.header_data,
4551 &packet.payload,
4552 )
4553 })
4554 .unwrap_or_default();
4555 if self.total_authed_packets > 1
4556 || packet.payload.len() <= 16 || !is_valid_retry
4558 {
4559 trace!("discarding invalid Retry");
4560 return Ok(());
4568 }
4569
4570 trace!("retrying with CID {}", remote_cid);
4571 let client_hello = state.client_hello.take().unwrap();
4572 self.retry_src_cid = Some(remote_cid);
4573 self.remote_cids
4574 .get_mut(&path_id)
4575 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
4576 .update_initial_cid(remote_cid);
4577 self.remote_handshake_cid = remote_cid;
4578
4579 let space = &mut self.spaces[SpaceId::Initial];
4580 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
4581 self.on_packet_acked(now, PathId::ZERO, 0, info);
4582 };
4583
4584 self.discard_space(now, SpaceKind::Initial); let crypto_space = &mut self.crypto_state.spaces[SpaceKind::Initial];
4587 crypto_space.keys = Some(
4588 self.crypto_state
4589 .session
4590 .initial_keys(remote_cid, self.side.side()),
4591 );
4592 crypto_space.crypto_offset = client_hello.len() as u64;
4593
4594 let next_pn = self.spaces[SpaceId::Initial]
4595 .for_path(path_id)
4596 .next_packet_number;
4597 self.spaces[SpaceId::Initial] = {
4598 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
4599 space.for_path(path_id).next_packet_number = next_pn;
4600 space.pending.crypto.push_back(frame::Crypto {
4601 offset: 0,
4602 data: client_hello,
4603 });
4604 space
4605 };
4606
4607 let zero_rtt = mem::take(
4609 &mut self.spaces[SpaceId::Data]
4610 .for_path(PathId::ZERO)
4611 .sent_packets,
4612 );
4613 for (_, info) in zero_rtt.into_iter() {
4614 self.paths
4615 .get_mut(&PathId::ZERO)
4616 .unwrap()
4617 .remove_in_flight(&info);
4618 self.spaces[SpaceId::Data].pending |= info.retransmits;
4619 }
4620 self.streams.retransmit_all_for_0rtt();
4621
4622 let token_len = packet.payload.len() - 16;
4623 let ConnectionSide::Client { ref mut token, .. } = self.side else {
4624 unreachable!("we already short-circuited if we're server");
4625 };
4626 *token = packet.payload.freeze().split_to(token_len);
4627
4628 self.state = State::handshake(state::Handshake {
4629 expected_token: Bytes::new(),
4630 remote_cid_set: false,
4631 client_hello: None,
4632 allow_server_migration: true,
4633 });
4634 Ok(())
4635 }
4636 Header::Long {
4637 ty: LongType::Handshake,
4638 src_cid: remote_cid,
4639 dst_cid: local_cid,
4640 ..
4641 } => {
4642 debug_assert_eq!(path_id, PathId::ZERO);
4643 if remote_cid != self.remote_handshake_cid {
4644 debug!(
4645 "discarding packet with mismatched remote CID: {} != {}",
4646 self.remote_handshake_cid, remote_cid
4647 );
4648 return Ok(());
4649 }
4650 self.on_path_validated(path_id);
4651
4652 self.process_early_payload(now, path_id, packet, qlog)?;
4653 if self.state.is_closed() {
4654 return Ok(());
4655 }
4656
4657 if self.crypto_state.session.is_handshaking() {
4658 trace!("handshake ongoing");
4659 return Ok(());
4660 }
4661
4662 if self.side.is_client() {
4663 let params = self
4665 .crypto_state
4666 .session
4667 .transport_parameters()?
4668 .ok_or_else(|| {
4669 TransportError::new(
4670 TransportErrorCode::crypto(0x6d),
4671 "transport parameters missing".to_owned(),
4672 )
4673 })?;
4674
4675 if self.has_0rtt() {
4676 if !self.crypto_state.session.early_data_accepted().unwrap() {
4677 debug_assert!(self.side.is_client());
4678 debug!("0-RTT rejected");
4679 self.crypto_state.accepted_0rtt = false;
4680 self.streams.zero_rtt_rejected();
4681
4682 self.spaces[SpaceId::Data].pending = Retransmits::default();
4684
4685 let sent_packets = mem::take(
4687 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
4688 );
4689 for (_, packet) in sent_packets.into_iter() {
4690 self.paths
4691 .get_mut(&path_id)
4692 .unwrap()
4693 .remove_in_flight(&packet);
4694 }
4695 } else {
4696 self.crypto_state.accepted_0rtt = true;
4697 params.validate_resumption_from(&self.peer_params)?;
4698 }
4699 }
4700 if let Some(token) = params.stateless_reset_token {
4701 let remote = self.path_data(path_id).network_path.remote;
4702 debug_assert!(!self.state.is_drained()); self.endpoint_events
4704 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4705 }
4706 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4707 self.issue_first_cids(now);
4708 } else {
4709 self.spaces[SpaceId::Data].pending.handshake_done = true;
4711 self.discard_space(now, SpaceKind::Handshake);
4712 self.events.push_back(Event::HandshakeConfirmed);
4713 trace!("handshake confirmed");
4714 }
4715
4716 self.events.push_back(Event::Connected);
4717 self.state.move_to_established();
4718 trace!("established");
4719
4720 self.issue_first_path_cids(now);
4723 Ok(())
4724 }
4725 Header::Initial(InitialHeader {
4726 src_cid: remote_cid,
4727 dst_cid: local_cid,
4728 ..
4729 }) => {
4730 debug_assert_eq!(path_id, PathId::ZERO);
4731 if !state.remote_cid_set {
4732 trace!("switching remote CID to {}", remote_cid);
4733 let mut state = state.clone();
4734 self.remote_cids
4735 .get_mut(&path_id)
4736 .expect("PathId::ZERO not yet abandoned")
4737 .update_initial_cid(remote_cid);
4738 self.remote_handshake_cid = remote_cid;
4739 self.original_remote_cid = remote_cid;
4740 state.remote_cid_set = true;
4741 self.state.move_to_handshake(state);
4742 } else if remote_cid != self.remote_handshake_cid {
4743 debug!(
4744 "discarding packet with mismatched remote CID: {} != {}",
4745 self.remote_handshake_cid, remote_cid
4746 );
4747 return Ok(());
4748 }
4749
4750 let starting_space = self.highest_space;
4751 self.process_early_payload(now, path_id, packet, qlog)?;
4752
4753 if self.side.is_server()
4754 && starting_space == SpaceKind::Initial
4755 && self.highest_space != SpaceKind::Initial
4756 {
4757 let params = self
4758 .crypto_state
4759 .session
4760 .transport_parameters()?
4761 .ok_or_else(|| {
4762 TransportError::new(
4763 TransportErrorCode::crypto(0x6d),
4764 "transport parameters missing".to_owned(),
4765 )
4766 })?;
4767 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4768 self.issue_first_cids(now);
4769 self.init_0rtt(now);
4770 }
4771 Ok(())
4772 }
4773 Header::Long {
4774 ty: LongType::ZeroRtt,
4775 ..
4776 } => {
4777 self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4778 Ok(())
4779 }
4780 Header::VersionNegotiate { .. } => {
4781 if self.total_authed_packets > 1 {
4782 return Ok(());
4783 }
4784 let supported = packet
4785 .payload
4786 .chunks(4)
4787 .any(|x| match <[u8; 4]>::try_from(x) {
4788 Ok(version) => self.version == u32::from_be_bytes(version),
4789 Err(_) => false,
4790 });
4791 if supported {
4792 return Ok(());
4793 }
4794 debug!("remote doesn't support our version");
4795 Err(ConnectionError::VersionMismatch)
4796 }
4797 Header::Short { .. } => unreachable!(
4798 "short packets received during handshake are discarded in handle_packet"
4799 ),
4800 }
4801 }
4802
4803 fn process_early_payload(
4805 &mut self,
4806 now: Instant,
4807 path_id: PathId,
4808 packet: Packet,
4809 #[allow(unused)] qlog: &mut QlogRecvPacket,
4810 ) -> Result<(), TransportError> {
4811 debug_assert_ne!(packet.header.space(), SpaceKind::Data);
4812 debug_assert_eq!(path_id, PathId::ZERO);
4813 let payload_len = packet.payload.len();
4814 let mut ack_eliciting = false;
4815 for result in frame::Iter::new(packet.payload.freeze())? {
4816 let frame = result?;
4817 qlog.frame(&frame);
4818 let span = match frame {
4819 Frame::Padding => continue,
4820 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4821 };
4822
4823 self.path_stats
4824 .for_path(path_id)
4825 .frame_rx
4826 .record(frame.ty());
4827
4828 let _guard = span.as_ref().map(|x| x.enter());
4829 ack_eliciting |= frame.is_ack_eliciting();
4830
4831 if frame.is_1rtt() && packet.header.space() != SpaceKind::Data {
4833 return Err(TransportError::PROTOCOL_VIOLATION(
4834 "illegal frame type in handshake",
4835 ));
4836 }
4837
4838 match frame {
4839 Frame::Padding | Frame::Ping => {}
4840 Frame::Crypto(frame) => {
4841 self.read_crypto(packet.header.space().into(), &frame, payload_len)?;
4842 }
4843 Frame::Ack(ack) => {
4844 self.on_ack_received(now, packet.header.space().into(), ack)?;
4845 }
4846 Frame::PathAck(ack) => {
4847 span.as_ref()
4848 .map(|span| span.record("path", tracing::field::display(&ack.path_id)));
4849 self.on_path_ack_received(now, packet.header.space().into(), ack)?;
4850 }
4851 Frame::Close(reason) => {
4852 self.state.move_to_draining(Some(reason.into()));
4853 self.endpoint_events.push_back(EndpointEventInner::Draining);
4854 return Ok(());
4855 }
4856 _ => {
4857 let mut err =
4858 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4859 err.frame = frame::MaybeFrame::Known(frame.ty());
4860 return Err(err);
4861 }
4862 }
4863 }
4864
4865 if ack_eliciting {
4866 self.spaces[packet.header.space()]
4868 .for_path(path_id)
4869 .pending_acks
4870 .set_immediate_ack_required();
4871 }
4872
4873 self.write_crypto();
4874 Ok(())
4875 }
4876
4877 fn process_payload(
4879 &mut self,
4880 now: Instant,
4881 network_path: FourTuple,
4882 path_id: PathId,
4883 number: u64,
4884 packet: Packet,
4885 #[allow(unused)] qlog: &mut QlogRecvPacket,
4886 ) -> Result<(), TransportError> {
4887 let is_multipath_negotiated = self.is_multipath_negotiated();
4888 let payload = packet.payload.freeze();
4889 let mut is_probing_packet = true;
4890 let mut close = None;
4891 let payload_len = payload.len();
4892 let mut ack_eliciting = false;
4893 let mut migration_observed_addr = None;
4896 for result in frame::Iter::new(payload)? {
4897 let frame = result?;
4898 qlog.frame(&frame);
4899 let span = match frame {
4900 Frame::Padding => continue,
4901 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4902 };
4903
4904 self.path_stats
4905 .for_path(path_id)
4906 .frame_rx
4907 .record(frame.ty());
4908 match &frame {
4911 Frame::Crypto(f) => {
4912 trace!(offset = f.offset, len = f.data.len(), "got frame CRYPTO");
4913 }
4914 Frame::Stream(f) => {
4915 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got frame STREAM");
4916 }
4917 Frame::Datagram(f) => {
4918 trace!(len = f.data.len(), "got frame DATAGRAM");
4919 }
4920 f => {
4921 trace!("got frame {f}");
4922 }
4923 }
4924
4925 let _guard = span.enter();
4926 if packet.header.is_0rtt() {
4927 match frame {
4928 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4929 return Err(TransportError::PROTOCOL_VIOLATION(
4930 "illegal frame type in 0-RTT",
4931 ));
4932 }
4933 _ => {
4934 if frame.is_1rtt() {
4935 return Err(TransportError::PROTOCOL_VIOLATION(
4936 "illegal frame type in 0-RTT",
4937 ));
4938 }
4939 }
4940 }
4941 }
4942 ack_eliciting |= frame.is_ack_eliciting();
4943
4944 match frame {
4946 Frame::Padding
4947 | Frame::PathChallenge(_)
4948 | Frame::PathResponse(_)
4949 | Frame::NewConnectionId(_)
4950 | Frame::ObservedAddr(_) => {}
4951 _ => {
4952 is_probing_packet = false;
4953 }
4954 }
4955
4956 match frame {
4957 Frame::Crypto(frame) => {
4958 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4959 }
4960 Frame::Stream(frame) => {
4961 if self.streams.received(frame, payload_len)?.should_transmit() {
4962 self.spaces[SpaceId::Data].pending.max_data = true;
4963 }
4964 }
4965 Frame::Ack(ack) => {
4966 self.on_ack_received(now, SpaceId::Data, ack)?;
4967 }
4968 Frame::PathAck(ack) => {
4969 if !self.is_multipath_negotiated() {
4970 return Err(TransportError::PROTOCOL_VIOLATION(
4971 "received PATH_ACK frame when multipath was not negotiated",
4972 ));
4973 }
4974 span.record("path", tracing::field::display(&ack.path_id));
4975 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4976 }
4977 Frame::Padding | Frame::Ping => {}
4978 Frame::Close(reason) => {
4979 close = Some(reason);
4980 }
4981 Frame::PathChallenge(challenge) => {
4982 let path = &mut self
4983 .path_mut(path_id)
4984 .expect("payload is processed only after the path becomes known");
4985 path.path_responses.push(number, challenge.0, network_path);
4986 if network_path.remote == path.network_path.remote {
4990 match self.peer_supports_ack_frequency() {
4998 true => self.immediate_ack(path_id),
4999 false => {
5000 self.ping_path(path_id).ok();
5001 }
5002 }
5003 }
5004 }
5005 Frame::PathResponse(response) => {
5006 if self
5008 .n0_nat_traversal
5009 .handle_path_response(network_path, response.0)
5010 {
5011 self.open_nat_traversed_paths(now);
5012 } else {
5013 let path = self
5016 .paths
5017 .get_mut(&path_id)
5018 .expect("payload is processed only after the path becomes known");
5019
5020 use PathTimer::*;
5021 use paths::OnPathResponseReceived::*;
5022 match path
5023 .data
5024 .on_path_response_received(now, response.0, network_path)
5025 {
5026 OnPath { was_open } => {
5027 let qlog = self.qlog.with_time(now);
5028
5029 self.timers.stop(
5030 Timer::PerPath(path_id, PathValidationFailed),
5031 qlog.clone(),
5032 );
5033 self.timers.stop(
5034 Timer::PerPath(path_id, AbandonFromValidation),
5035 qlog.clone(),
5036 );
5037
5038 let next_challenge = path
5039 .data
5040 .earliest_on_path_expiring_challenge()
5041 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
5042 self.timers.set_or_stop(
5043 Timer::PerPath(path_id, PathChallengeLost),
5044 next_challenge,
5045 qlog,
5046 );
5047
5048 if !was_open {
5049 if is_multipath_negotiated {
5050 self.events.push_back(Event::Path(
5051 PathEvent::Established { id: path_id },
5052 ));
5053 }
5054 if let Some(observed) =
5055 path.data.last_observed_addr_report.as_ref()
5056 {
5057 self.events.push_back(Event::Path(
5058 PathEvent::ObservedAddr {
5059 id: path_id,
5060 addr: observed.socket_addr(),
5061 },
5062 ));
5063 }
5064 }
5065 if let Some((_, ref mut prev)) = path.prev {
5066 prev.reset_on_path_challenges();
5071 }
5072 }
5073 Ignored {
5074 sent_on,
5075 current_path,
5076 } => {
5077 debug!(%sent_on, %current_path, %response, "ignoring valid PATH_RESPONSE")
5078 }
5079 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
5080 }
5081 }
5082 }
5083 Frame::MaxData(frame::MaxData(bytes)) => {
5084 self.streams.received_max_data(bytes);
5085 }
5086 Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
5087 self.streams.received_max_stream_data(id, offset)?;
5088 }
5089 Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
5090 self.streams.received_max_streams(dir, count)?;
5091 }
5092 Frame::ResetStream(frame) => {
5093 if self.streams.received_reset(frame)?.should_transmit() {
5094 self.spaces[SpaceId::Data].pending.max_data = true;
5095 }
5096 }
5097 Frame::DataBlocked(DataBlocked(offset)) => {
5098 debug!(offset, "peer claims to be blocked at connection level");
5099 }
5100 Frame::StreamDataBlocked(StreamDataBlocked { id, offset }) => {
5101 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
5102 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
5103 return Err(TransportError::STREAM_STATE_ERROR(
5104 "STREAM_DATA_BLOCKED on send-only stream",
5105 ));
5106 }
5107 debug!(
5108 stream = %id,
5109 offset, "peer claims to be blocked at stream level"
5110 );
5111 }
5112 Frame::StreamsBlocked(StreamsBlocked { dir, limit }) => {
5113 if limit > MAX_STREAM_COUNT {
5114 return Err(TransportError::FRAME_ENCODING_ERROR(
5115 "unrepresentable stream limit",
5116 ));
5117 }
5118 debug!(
5119 "peer claims to be blocked opening more than {} {} streams",
5120 limit, dir
5121 );
5122 }
5123 Frame::StopSending(frame::StopSending { id, error_code }) => {
5124 if id.initiator() != self.side.side() {
5125 if id.dir() == Dir::Uni {
5126 debug!("got STOP_SENDING on recv-only {}", id);
5127 return Err(TransportError::STREAM_STATE_ERROR(
5128 "STOP_SENDING on recv-only stream",
5129 ));
5130 }
5131 } else if self.streams.is_local_unopened(id) {
5132 return Err(TransportError::STREAM_STATE_ERROR(
5133 "STOP_SENDING on unopened stream",
5134 ));
5135 }
5136 self.streams.received_stop_sending(id, error_code);
5137 }
5138 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
5139 if let Some(ref path_id) = path_id {
5140 span.record("path", tracing::field::display(&path_id));
5141 }
5142 let path_id = path_id.unwrap_or_default();
5143 match self.local_cid_state.get_mut(&path_id) {
5144 None => debug!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
5145 Some(cid_state) => {
5146 let allow_more_cids = cid_state
5147 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
5148
5149 let has_path = !self.abandoned_paths.contains(&path_id);
5153 let allow_more_cids = allow_more_cids && has_path;
5154
5155 debug_assert!(!self.state.is_drained()); self.endpoint_events
5157 .push_back(EndpointEventInner::RetireConnectionId(
5158 now,
5159 path_id,
5160 sequence,
5161 allow_more_cids,
5162 ));
5163 }
5164 }
5165 }
5166 Frame::NewConnectionId(frame) => {
5167 let path_id = if let Some(path_id) = frame.path_id {
5168 if !self.is_multipath_negotiated() {
5169 return Err(TransportError::PROTOCOL_VIOLATION(
5170 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
5171 ));
5172 }
5173 if path_id > self.local_max_path_id {
5174 return Err(TransportError::PROTOCOL_VIOLATION(
5175 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
5176 ));
5177 }
5178 path_id
5179 } else {
5180 PathId::ZERO
5181 };
5182
5183 if let Some(ref path_id) = frame.path_id {
5184 span.record("path", tracing::field::display(&path_id));
5185 }
5186
5187 if self.abandoned_paths.contains(&path_id) {
5188 trace!("ignoring issued CID for abandoned path");
5189 continue;
5190 }
5191 let remote_cids = self
5192 .remote_cids
5193 .entry(path_id)
5194 .or_insert_with(|| CidQueue::new(frame.id));
5195 if remote_cids.active().is_empty() {
5196 return Err(TransportError::PROTOCOL_VIOLATION(
5197 "NEW_CONNECTION_ID when CIDs aren't in use",
5198 ));
5199 }
5200 if frame.retire_prior_to > frame.sequence {
5201 return Err(TransportError::PROTOCOL_VIOLATION(
5202 "NEW_CONNECTION_ID retiring unissued CIDs",
5203 ));
5204 }
5205
5206 use crate::cid_queue::InsertError;
5207 match remote_cids.insert(frame) {
5208 Ok(None) => {
5209 self.open_nat_traversed_paths(now);
5210 }
5211 Ok(Some((retired, reset_token))) => {
5212 let pending_retired =
5213 &mut self.spaces[SpaceId::Data].pending.retire_cids;
5214 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
5217 if (pending_retired.len() as u64)
5220 .saturating_add(retired.end.saturating_sub(retired.start))
5221 > MAX_PENDING_RETIRED_CIDS
5222 {
5223 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
5224 "queued too many retired CIDs",
5225 ));
5226 }
5227 pending_retired.extend(retired.map(|seq| (path_id, seq)));
5228 self.set_reset_token(path_id, network_path.remote, reset_token);
5229 self.open_nat_traversed_paths(now);
5230 }
5231 Err(InsertError::ExceedsLimit) => {
5232 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
5233 }
5234 Err(InsertError::Retired) => {
5235 trace!("discarding already-retired");
5236 self.spaces[SpaceId::Data]
5240 .pending
5241 .retire_cids
5242 .push((path_id, frame.sequence));
5243 continue;
5244 }
5245 };
5246
5247 if self.side.is_server()
5248 && path_id == PathId::ZERO
5249 && self
5250 .remote_cids
5251 .get(&PathId::ZERO)
5252 .map(|cids| cids.active_seq() == 0)
5253 .unwrap_or_default()
5254 {
5255 self.update_remote_cid(PathId::ZERO);
5258 }
5259 }
5260 Frame::NewToken(NewToken { token }) => {
5261 let ConnectionSide::Client {
5262 token_store,
5263 server_name,
5264 ..
5265 } = &self.side
5266 else {
5267 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
5268 };
5269 if token.is_empty() {
5270 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
5271 }
5272 trace!("got new token");
5273 token_store.insert(server_name, token);
5274 }
5275 Frame::Datagram(datagram) => {
5276 if self
5277 .datagrams
5278 .received(datagram, &self.config.datagram_receive_buffer_size)?
5279 {
5280 self.events.push_back(Event::DatagramReceived);
5281 }
5282 }
5283 Frame::AckFrequency(ack_frequency) => {
5284 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
5287 continue;
5290 }
5291
5292 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
5294 space.pending_acks.set_ack_frequency_params(&ack_frequency);
5295
5296 if !self.abandoned_paths.contains(path_id)
5300 && let Some(timeout) = space
5301 .pending_acks
5302 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
5303 {
5304 self.timers.set(
5305 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
5306 timeout,
5307 self.qlog.with_time(now),
5308 );
5309 }
5310 }
5311 }
5312 Frame::ImmediateAck => {
5313 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
5315 pns.pending_acks.set_immediate_ack_required();
5316 }
5317 }
5318 Frame::HandshakeDone => {
5319 if self.side.is_server() {
5320 return Err(TransportError::PROTOCOL_VIOLATION(
5321 "client sent HANDSHAKE_DONE",
5322 ));
5323 }
5324 if self.crypto_state.has_keys(EncryptionLevel::Handshake) {
5325 self.discard_space(now, SpaceKind::Handshake);
5326 self.events.push_back(Event::HandshakeConfirmed);
5327 trace!("handshake confirmed");
5328 }
5329 }
5330 Frame::ObservedAddr(observed) => {
5331 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
5333 if !self
5334 .peer_params
5335 .address_discovery_role
5336 .should_report(&self.config.address_discovery_role)
5337 {
5338 return Err(TransportError::PROTOCOL_VIOLATION(
5339 "received OBSERVED_ADDRESS frame when not negotiated",
5340 ));
5341 }
5342 if packet.header.space() != SpaceKind::Data {
5344 return Err(TransportError::PROTOCOL_VIOLATION(
5345 "OBSERVED_ADDRESS frame outside data space",
5346 ));
5347 }
5348
5349 let path = self.path_data_mut(path_id);
5350 if path.network_path.remote == network_path.remote {
5351 if let Some(updated) = path.update_observed_addr_report(observed)
5352 && path.open_status == paths::OpenStatus::Informed
5353 {
5354 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5355 id: path_id,
5356 addr: updated,
5357 }));
5358 }
5360 } else {
5361 migration_observed_addr = Some(observed)
5363 }
5364 }
5365 Frame::PathAbandon(frame::PathAbandon {
5366 path_id,
5367 error_code,
5368 }) => {
5369 span.record("path", tracing::field::display(&path_id));
5370 match self.close_path_inner(
5371 now,
5372 path_id,
5373 PathAbandonReason::RemoteAbandoned {
5374 error_code: error_code.into(),
5375 },
5376 ) {
5377 Ok(()) => {
5378 trace!("peer abandoned path");
5379 }
5380 Err(ClosePathError::ClosedPath) => {
5381 trace!("peer abandoned already closed path");
5382 }
5383 Err(ClosePathError::MultipathNotNegotiated) => {
5384 return Err(TransportError::PROTOCOL_VIOLATION(
5385 "received PATH_ABANDON frame when multipath was not negotiated",
5386 ));
5387 }
5388 Err(ClosePathError::LastOpenPath) => {
5389 error!(
5392 "peer abandoned last path but close_path_inner returned LastOpenPath"
5393 );
5394 }
5395 };
5396
5397 if let Some(path) = self.paths.get_mut(&path_id)
5399 && !mem::replace(&mut path.data.draining, true)
5400 {
5401 let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
5402 let pto = path.data.rtt.pto_base() + ack_delay;
5403 self.timers.set(
5404 Timer::PerPath(path_id, PathTimer::PathDrained),
5405 now + 3 * pto,
5406 self.qlog.with_time(now),
5407 );
5408
5409 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
5410 }
5411 }
5412 Frame::PathStatusAvailable(info) => {
5413 span.record("path", tracing::field::display(&info.path_id));
5414 if self.is_multipath_negotiated() {
5415 self.on_path_status(
5416 info.path_id,
5417 PathStatus::Available,
5418 info.status_seq_no,
5419 );
5420 } else {
5421 return Err(TransportError::PROTOCOL_VIOLATION(
5422 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
5423 ));
5424 }
5425 }
5426 Frame::PathStatusBackup(info) => {
5427 span.record("path", tracing::field::display(&info.path_id));
5428 if self.is_multipath_negotiated() {
5429 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
5430 } else {
5431 return Err(TransportError::PROTOCOL_VIOLATION(
5432 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
5433 ));
5434 }
5435 }
5436 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
5437 span.record("path", tracing::field::display(&path_id));
5438 if !self.is_multipath_negotiated() {
5439 return Err(TransportError::PROTOCOL_VIOLATION(
5440 "received MAX_PATH_ID frame when multipath was not negotiated",
5441 ));
5442 }
5443 if path_id > self.remote_max_path_id {
5445 self.remote_max_path_id = path_id;
5446 self.issue_first_path_cids(now);
5447 self.open_nat_traversed_paths(now);
5448 }
5449 }
5450 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
5451 if self.is_multipath_negotiated() {
5455 if max_path_id > self.local_max_path_id {
5456 return Err(TransportError::PROTOCOL_VIOLATION(
5457 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
5458 ));
5459 }
5460 debug!("received PATHS_BLOCKED({:?})", max_path_id);
5461 } else {
5463 return Err(TransportError::PROTOCOL_VIOLATION(
5464 "received PATHS_BLOCKED frame when not multipath was not negotiated",
5465 ));
5466 }
5467 }
5468 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
5469 if self.is_multipath_negotiated() {
5477 if path_id > self.local_max_path_id {
5478 return Err(TransportError::PROTOCOL_VIOLATION(
5479 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
5480 ));
5481 }
5482 if next_seq.0
5483 > self
5484 .local_cid_state
5485 .get(&path_id)
5486 .map(|cid_state| cid_state.active_seq().1 + 1)
5487 .unwrap_or_default()
5488 {
5489 return Err(TransportError::PROTOCOL_VIOLATION(
5490 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
5491 ));
5492 }
5493 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
5494 } else {
5495 return Err(TransportError::PROTOCOL_VIOLATION(
5496 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
5497 ));
5498 }
5499 }
5500 Frame::AddAddress(addr) => {
5501 let client_state = match self.n0_nat_traversal.client_side_mut() {
5502 Ok(state) => state,
5503 Err(err) => {
5504 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5505 "Nat traversal(ADD_ADDRESS): {err}"
5506 )));
5507 }
5508 };
5509
5510 if !client_state.check_remote_address(&addr) {
5511 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
5513 }
5514
5515 match client_state.add_remote_address(addr) {
5516 Ok(maybe_added) => {
5517 if let Some(added) = maybe_added {
5518 self.events.push_back(Event::NatTraversal(
5519 n0_nat_traversal::Event::AddressAdded(added),
5520 ));
5521 }
5522 }
5523 Err(e) => {
5524 warn!(%e, "failed to add remote address")
5525 }
5526 }
5527 }
5528 Frame::RemoveAddress(addr) => {
5529 let client_state = match self.n0_nat_traversal.client_side_mut() {
5530 Ok(state) => state,
5531 Err(err) => {
5532 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5533 "Nat traversal(REMOVE_ADDRESS): {err}"
5534 )));
5535 }
5536 };
5537 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
5538 self.events.push_back(Event::NatTraversal(
5539 n0_nat_traversal::Event::AddressRemoved(removed_addr),
5540 ));
5541 }
5542 }
5543 Frame::ReachOut(reach_out) => {
5544 let ipv6 = self.is_ipv6();
5545 let server_state = match self.n0_nat_traversal.server_side_mut() {
5546 Ok(state) => state,
5547 Err(err) => {
5548 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5549 "Nat traversal(REACH_OUT): {err}"
5550 )));
5551 }
5552 };
5553
5554 let round_before = server_state.current_round();
5555
5556 if let Err(err) = server_state.handle_reach_out(reach_out, ipv6) {
5557 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5558 "Nat traversal(REACH_OUT): {err}"
5559 )));
5560 }
5561
5562 if server_state.current_round() > round_before {
5563 if let Some(delay) =
5565 self.n0_nat_traversal.retry_delay(self.config.initial_rtt)
5566 {
5567 self.timers.set(
5568 Timer::Conn(ConnTimer::NatTraversalProbeRetry),
5569 now + delay,
5570 self.qlog.with_time(now),
5571 );
5572 }
5573 }
5574 }
5575 }
5576 }
5577
5578 let space = self.spaces[SpaceId::Data].for_path(path_id);
5579 if space
5580 .pending_acks
5581 .packet_received(now, number, ack_eliciting, &space.dedup)
5582 {
5583 if self.abandoned_paths.contains(&path_id) {
5584 space.pending_acks.set_immediate_ack_required();
5587 } else {
5588 self.timers.set(
5589 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
5590 now + self.ack_frequency.max_ack_delay,
5591 self.qlog.with_time(now),
5592 );
5593 }
5594 }
5595
5596 let pending = &mut self.spaces[SpaceId::Data].pending;
5601 self.streams.queue_max_stream_id(pending);
5602
5603 if let Some(reason) = close {
5604 self.state.move_to_draining(Some(reason.into()));
5605 self.endpoint_events.push_back(EndpointEventInner::Draining);
5606 self.connection_close_pending = true;
5607 }
5608
5609 let migrate_on_any_packet =
5612 self.is_multipath_negotiated() && !self.n0_nat_traversal.is_negotiated();
5613
5614 let is_largest_received_pn = Some(number)
5616 == self.spaces[SpaceId::Data]
5617 .for_path(path_id)
5618 .largest_received_packet_number;
5619
5620 if (migrate_on_any_packet || !is_probing_packet)
5625 && is_largest_received_pn
5626 && self.local_ip_may_migrate()
5627 && let Some(new_local_ip) = network_path.local_ip
5628 {
5629 let path_data = self.path_data_mut(path_id);
5630 if path_data
5631 .network_path
5632 .local_ip
5633 .is_some_and(|ip| ip != new_local_ip)
5634 {
5635 debug!(
5636 %path_id,
5637 new_4tuple = %network_path,
5638 prev_4tuple = %path_data.network_path,
5639 "local address passive migration"
5640 );
5641 }
5642 path_data.network_path.local_ip = Some(new_local_ip)
5643 }
5644
5645 if self.peer_may_migrate()
5647 && (migrate_on_any_packet || !is_probing_packet)
5648 && is_largest_received_pn
5649 && network_path.remote != self.path_data(path_id).network_path.remote
5650 {
5651 self.migrate(path_id, now, network_path, migration_observed_addr);
5652 self.update_remote_cid(path_id);
5654 self.spin = false;
5655 }
5656
5657 Ok(())
5658 }
5659
5660 fn open_nat_traversed_paths(&mut self, now: Instant) {
5662 while let Some(network_path) = self
5663 .n0_nat_traversal
5664 .client_side_mut()
5665 .ok()
5666 .and_then(|s| s.pop_pending_path_open())
5667 {
5668 match self.open_path_ensure(network_path, PathStatus::Backup, now) {
5669 Ok((path_id, already_existed)) => {
5670 debug!(
5671 %path_id,
5672 ?network_path,
5673 new_path = !already_existed,
5674 "Opened NAT traversal path",
5675 );
5676 }
5677 Err(err) => match err {
5678 PathError::MultipathNotNegotiated
5679 | PathError::ServerSideNotAllowed
5680 | PathError::ValidationFailed
5681 | PathError::InvalidRemoteAddress(_) => {
5682 error!(
5683 ?err,
5684 ?network_path,
5685 "Failed to open path for successful NAT traversal"
5686 );
5687 }
5688 PathError::MaxPathIdReached | PathError::RemoteCidsExhausted => {
5689 self.n0_nat_traversal
5691 .client_side_mut()
5692 .map(|s| s.push_pending_path_open(network_path))
5693 .ok();
5694 debug!(
5695 ?err,
5696 ?network_path,
5697 "Blocked opening NAT traversal path, enqueued"
5698 );
5699 return;
5700 }
5701 },
5702 }
5703 }
5704 }
5705
5706 fn migrate(
5711 &mut self,
5712 path_id: PathId,
5713 now: Instant,
5714 network_path: FourTuple,
5715 observed_addr: Option<ObservedAddr>,
5716 ) {
5717 trace!(
5718 new_4tuple = %network_path,
5719 prev_4tuple = %self.path_data(path_id).network_path,
5720 %path_id,
5721 "migration initiated",
5722 );
5723 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
5724 let prev_pto = self.pto(SpaceKind::Data, path_id);
5731 let path = self.paths.get_mut(&path_id).expect("known path");
5732 let mut new_path_data = if network_path.remote.is_ipv4()
5733 && network_path.remote.ip() == path.data.network_path.remote.ip()
5734 {
5735 PathData::from_previous(network_path, &path.data, self.path_generation_counter, now)
5736 } else {
5737 let peer_max_udp_payload_size =
5738 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
5739 .unwrap_or(u16::MAX);
5740 PathData::new(
5741 network_path,
5742 self.allow_mtud,
5743 Some(peer_max_udp_payload_size),
5744 self.path_generation_counter,
5745 now,
5746 &self.config,
5747 )
5748 };
5749 new_path_data.last_observed_addr_report = path.data.last_observed_addr_report.clone();
5750 if let Some(report) = observed_addr
5751 && let Some(updated) = new_path_data.update_observed_addr_report(report)
5752 {
5753 tracing::info!("adding observed addr event from migration");
5754 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5755 id: path_id,
5756 addr: updated,
5757 }));
5758 }
5759 new_path_data.pending_on_path_challenge = true;
5760
5761 let mut prev_path_data = mem::replace(&mut path.data, new_path_data);
5762
5763 if !prev_path_data.validated
5772 && let Some(cid) = self.remote_cids.get(&path_id).map(CidQueue::active)
5773 {
5774 prev_path_data.pending_on_path_challenge = true;
5775 path.prev = Some((cid, prev_path_data));
5778 }
5779
5780 self.qlog.emit_tuple_assigned(path_id, network_path, now);
5782
5783 self.timers.set(
5784 Timer::PerPath(path_id, PathTimer::PathValidationFailed),
5785 now + 3 * cmp::max(self.pto(SpaceKind::Data, path_id), prev_pto),
5786 self.qlog.with_time(now),
5787 );
5788 }
5789
5790 pub fn handle_network_change(&mut self, hint: Option<&dyn NetworkChangeHint>, now: Instant) {
5807 debug!("network changed");
5808 if self.state.is_drained() {
5809 return;
5810 }
5811 if self.highest_space < SpaceKind::Data {
5812 for path in self.paths.values_mut() {
5813 path.data.network_path.local_ip = None;
5815 }
5816
5817 self.update_remote_cid(PathId::ZERO);
5818 self.ping();
5819
5820 return;
5821 }
5822
5823 let mut non_recoverable_paths = Vec::default();
5826 let mut recoverable_paths = Vec::default();
5827 let mut open_paths = 0;
5828
5829 let is_multipath_negotiated = self.is_multipath_negotiated();
5830 let is_client = self.side().is_client();
5831 let immediate_ack_allowed = self.peer_supports_ack_frequency();
5832
5833 for (path_id, path) in self.paths.iter_mut() {
5834 if self.abandoned_paths.contains(path_id) {
5835 continue;
5836 }
5837 open_paths += 1;
5838
5839 let network_path = path.data.network_path;
5842
5843 path.data.network_path.local_ip = None;
5846 let remote = network_path.remote;
5847
5848 let attempt_to_recover = if is_multipath_negotiated {
5852 hint.map(|h| h.is_path_recoverable(*path_id, network_path))
5856 .unwrap_or(!is_client)
5857 } else {
5858 true
5860 };
5861
5862 if attempt_to_recover {
5863 recoverable_paths.push((*path_id, remote));
5864 } else {
5865 non_recoverable_paths.push((*path_id, remote, path.data.local_status()))
5866 }
5867 }
5868
5869 let open_first = open_paths == non_recoverable_paths.len();
5878
5879 for (path_id, remote, status) in non_recoverable_paths.into_iter() {
5880 let network_path = FourTuple {
5881 remote,
5882 local_ip: None, };
5884
5885 if open_first && let Err(e) = self.open_path(network_path, status, now) {
5886 if self.side().is_client() {
5887 debug!(%e, "Failed to open new path for network change");
5888 }
5889 recoverable_paths.push((path_id, remote));
5891 continue;
5892 }
5893
5894 if let Err(e) =
5895 self.close_path_inner(now, path_id, PathAbandonReason::UnusableAfterNetworkChange)
5896 {
5897 debug!(%e,"Failed to close unrecoverable path after network change");
5898 recoverable_paths.push((path_id, remote));
5899 continue;
5900 }
5901
5902 if !open_first && let Err(e) = self.open_path(network_path, status, now) {
5903 debug!(%e,"Failed to open new path for network change");
5907 }
5908 }
5909
5910 for (path_id, remote) in recoverable_paths.into_iter() {
5913 if let Some(path_space) = self.spaces[SpaceId::Data].number_spaces.get_mut(&path_id) {
5915 path_space.ping_pending = true;
5916
5917 if immediate_ack_allowed {
5918 path_space.immediate_ack_pending = true;
5919 }
5920 }
5921
5922 if let Some(path) = self.paths.get_mut(&path_id) {
5927 path.data.pto_count = 0;
5928 }
5929 self.set_loss_detection_timer(now, path_id);
5930
5931 let Some((reset_token, retired)) =
5932 self.remote_cids.get_mut(&path_id).and_then(CidQueue::next)
5933 else {
5934 continue;
5935 };
5936
5937 self.spaces[SpaceId::Data]
5939 .pending
5940 .retire_cids
5941 .extend(retired.map(|seq| (path_id, seq)));
5942
5943 debug_assert!(!self.state.is_drained()); self.endpoint_events
5945 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5946 }
5947 }
5948
5949 fn update_remote_cid(&mut self, path_id: PathId) {
5951 let Some((reset_token, retired)) = self
5952 .remote_cids
5953 .get_mut(&path_id)
5954 .and_then(|cids| cids.next())
5955 else {
5956 return;
5957 };
5958
5959 self.spaces[SpaceId::Data]
5961 .pending
5962 .retire_cids
5963 .extend(retired.map(|seq| (path_id, seq)));
5964 let remote = self.path_data(path_id).network_path.remote;
5965 self.set_reset_token(path_id, remote, reset_token);
5966 }
5967
5968 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
5977 debug_assert!(!self.state.is_drained()); self.endpoint_events
5979 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5980
5981 if path_id == PathId::ZERO {
5987 self.peer_params.stateless_reset_token = Some(reset_token);
5988 }
5989 }
5990
5991 fn issue_first_cids(&mut self, now: Instant) {
5993 if self
5994 .local_cid_state
5995 .get(&PathId::ZERO)
5996 .expect("PathId::ZERO exists when the connection is created")
5997 .cid_len()
5998 == 0
5999 {
6000 return;
6001 }
6002
6003 let mut n = self.peer_params.issue_cids_limit() - 1;
6005 if let ConnectionSide::Server { server_config } = &self.side
6006 && server_config.has_preferred_address()
6007 {
6008 n -= 1;
6010 }
6011 debug_assert!(!self.state.is_drained()); self.endpoint_events
6013 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6014 }
6015
6016 fn issue_first_path_cids(&mut self, now: Instant) {
6020 if let Some(max_path_id) = self.max_path_id() {
6021 let mut path_id = self.max_path_id_with_cids.next();
6022 while path_id <= max_path_id {
6023 self.endpoint_events
6024 .push_back(EndpointEventInner::NeedIdentifiers(
6025 path_id,
6026 now,
6027 self.peer_params.issue_cids_limit(),
6028 ));
6029 path_id = path_id.next();
6030 }
6031 self.max_path_id_with_cids = max_path_id;
6032 }
6033 }
6034
6035 fn populate_packet<'a, 'b>(
6043 &mut self,
6044 now: Instant,
6045 space_id: SpaceId,
6046 path_id: PathId,
6047 scheduling_info: &PathSchedulingInfo,
6048 builder: &mut PacketBuilder<'a, 'b>,
6049 ) {
6050 let is_multipath_negotiated = self.is_multipath_negotiated();
6051 let space_has_keys = self.crypto_state.has_keys(space_id.encryption_level());
6052 let is_0rtt = space_id == SpaceId::Data && !space_has_keys;
6053 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
6054 let space = &mut self.spaces[space_id];
6055 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
6056 space
6057 .for_path(path_id)
6058 .pending_acks
6059 .maybe_ack_non_eliciting();
6060
6061 if !is_0rtt
6063 && !scheduling_info.is_abandoned
6064 && scheduling_info.may_send_data
6065 && mem::replace(&mut space.pending.handshake_done, false)
6066 {
6067 builder.write_frame(frame::HandshakeDone, stats);
6068 }
6069
6070 if !scheduling_info.is_abandoned
6072 && mem::replace(&mut space.for_path(path_id).ping_pending, false)
6073 {
6074 builder.write_frame(frame::Ping, stats);
6075 }
6076
6077 if !scheduling_info.is_abandoned
6079 && mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false)
6080 {
6081 debug_assert_eq!(
6082 space_id,
6083 SpaceId::Data,
6084 "immediate acks must be sent in the data space"
6085 );
6086 builder.write_frame(frame::ImmediateAck, stats);
6087 }
6088
6089 if !scheduling_info.is_abandoned && scheduling_info.may_send_data {
6091 for path_id in space
6092 .number_spaces
6093 .iter_mut()
6094 .filter(|(_, pns)| pns.pending_acks.can_send())
6095 .map(|(&path_id, _)| path_id)
6096 .collect::<Vec<_>>()
6097 {
6098 Self::populate_acks(
6099 now,
6100 self.receiving_ecn,
6101 path_id,
6102 space_id,
6103 space,
6104 is_multipath_negotiated,
6105 builder,
6106 stats,
6107 space_has_keys,
6108 );
6109 }
6110 }
6111
6112 if !scheduling_info.is_abandoned
6114 && scheduling_info.may_send_data
6115 && mem::replace(&mut space.pending.ack_frequency, false)
6116 {
6117 let sequence_number = self.ack_frequency.next_sequence_number();
6118
6119 let config = self.config.ack_frequency_config.as_ref().unwrap();
6121
6122 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
6124 path.rtt.get(),
6125 config,
6126 &self.peer_params,
6127 );
6128
6129 let frame = frame::AckFrequency {
6130 sequence: sequence_number,
6131 ack_eliciting_threshold: config.ack_eliciting_threshold,
6132 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
6133 reordering_threshold: config.reordering_threshold,
6134 };
6135 builder.write_frame(frame, stats);
6136
6137 self.ack_frequency
6138 .ack_frequency_sent(path_id, builder.packet_number, max_ack_delay);
6139 }
6140
6141 if !scheduling_info.is_abandoned
6143 && space_id == SpaceId::Data
6144 && path.pending_on_path_challenge
6145 && !self.state.is_closed()
6146 && builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
6147 {
6149 path.pending_on_path_challenge = false;
6150
6151 let token = self.rng.random();
6152 path.record_path_challenge_sent(now, token, path.network_path);
6153 let challenge = frame::PathChallenge(token);
6155 builder.write_frame(challenge, stats);
6156 builder.require_padding();
6157 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
6158 match path.open_status {
6159 paths::OpenStatus::Sent | paths::OpenStatus::Informed => {}
6160 paths::OpenStatus::Pending => {
6161 path.open_status = paths::OpenStatus::Sent;
6162 self.timers.set(
6163 Timer::PerPath(path_id, PathTimer::AbandonFromValidation),
6164 now + 3 * pto,
6165 self.qlog.with_time(now),
6166 );
6167 }
6168 }
6169
6170 self.timers.set(
6171 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
6172 now + pto,
6173 self.qlog.with_time(now),
6174 );
6175
6176 if is_multipath_negotiated && !path.validated && path.pending_on_path_challenge {
6177 space.pending.path_status.insert(path_id);
6179 }
6180
6181 if space_id == SpaceId::Data
6184 && self
6185 .config
6186 .address_discovery_role
6187 .should_report(&self.peer_params.address_discovery_role)
6188 {
6189 let frame = frame::ObservedAddr::new(
6190 path.network_path.remote,
6191 self.next_observed_addr_seq_no,
6192 );
6193 if builder.frame_space_remaining() > frame.size() {
6194 builder.write_frame(frame, stats);
6195
6196 self.next_observed_addr_seq_no =
6197 self.next_observed_addr_seq_no.saturating_add(1u8);
6198 path.observed_addr_sent = true;
6199
6200 space.pending.observed_addr = false;
6201 }
6202 }
6203 }
6204
6205 if !scheduling_info.is_abandoned
6207 && space_id == SpaceId::Data
6208 && builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
6209 && let Some(token) = path.path_responses.pop_on_path(path.network_path)
6210 {
6211 let response = frame::PathResponse(token);
6212 builder.write_frame(response, stats);
6213 builder.require_padding();
6214
6215 if space_id == SpaceId::Data
6219 && self
6220 .config
6221 .address_discovery_role
6222 .should_report(&self.peer_params.address_discovery_role)
6223 {
6224 let frame = frame::ObservedAddr::new(
6225 path.network_path.remote,
6226 self.next_observed_addr_seq_no,
6227 );
6228 if builder.frame_space_remaining() > frame.size() {
6229 builder.write_frame(frame, stats);
6230
6231 self.next_observed_addr_seq_no =
6232 self.next_observed_addr_seq_no.saturating_add(1u8);
6233 path.observed_addr_sent = true;
6234
6235 space.pending.observed_addr = false;
6236 }
6237 }
6238 }
6239
6240 while !scheduling_info.is_abandoned
6242 && scheduling_info.may_send_data
6243 && let Some(reach_out) = space
6244 .pending
6245 .reach_out
6246 .pop_if(|frame| builder.frame_space_remaining() >= frame.size())
6247 {
6248 builder.write_frame(reach_out, stats);
6249 }
6250
6251 if space_id == SpaceId::Data
6253 && scheduling_info.is_abandoned
6254 && scheduling_info.may_self_abandon
6255 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
6256 && let Some(error_code) = space.pending.path_abandon.remove(&path_id)
6257 {
6258 let frame = frame::PathAbandon {
6259 path_id,
6260 error_code,
6261 };
6262 builder.write_frame(frame, stats);
6263
6264 self.remote_cids.remove(&path_id);
6267 }
6268 while space_id == SpaceId::Data
6269 && scheduling_info.may_send_data
6270 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
6271 && let Some((abandoned_path_id, error_code)) = space.pending.path_abandon.pop_first()
6272 {
6273 let frame = frame::PathAbandon {
6274 path_id: abandoned_path_id,
6275 error_code,
6276 };
6277 builder.write_frame(frame, stats);
6278
6279 self.remote_cids.remove(&abandoned_path_id);
6282 }
6283
6284 if !scheduling_info.is_abandoned
6286 && scheduling_info.may_send_data
6287 && space_id == SpaceId::Data
6288 && self
6289 .config
6290 .address_discovery_role
6291 .should_report(&self.peer_params.address_discovery_role)
6292 && (!path.observed_addr_sent || space.pending.observed_addr)
6293 {
6294 let frame =
6295 frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
6296 if builder.frame_space_remaining() > frame.size() {
6297 builder.write_frame(frame, stats);
6298
6299 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
6300 path.observed_addr_sent = true;
6301
6302 space.pending.observed_addr = false;
6303 }
6304 }
6305
6306 while !is_0rtt
6308 && !scheduling_info.is_abandoned
6309 && scheduling_info.may_send_data
6310 && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
6311 {
6312 let Some(mut frame) = space.pending.crypto.pop_front() else {
6313 break;
6314 };
6315
6316 let max_crypto_data_size = builder.frame_space_remaining()
6321 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
6323 - 2; let len = frame
6326 .data
6327 .len()
6328 .min(2usize.pow(14) - 1)
6329 .min(max_crypto_data_size);
6330
6331 let data = frame.data.split_to(len);
6332 let offset = frame.offset;
6333 let truncated = frame::Crypto { offset, data };
6334 builder.write_frame(truncated, stats);
6335
6336 if !frame.data.is_empty() {
6337 frame.offset += len as u64;
6338 space.pending.crypto.push_front(frame);
6339 }
6340 }
6341
6342 while space_id == SpaceId::Data
6344 && !scheduling_info.is_abandoned
6345 && scheduling_info.may_send_data
6346 && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
6347 {
6348 let Some(path_id) = space.pending.path_status.pop_first() else {
6349 break;
6350 };
6351 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
6352 trace!(%path_id, "discarding queued path status for unknown path");
6353 continue;
6354 };
6355
6356 let seq = path.status.seq();
6357 match path.local_status() {
6358 PathStatus::Available => {
6359 let frame = frame::PathStatusAvailable {
6360 path_id,
6361 status_seq_no: seq,
6362 };
6363 builder.write_frame(frame, stats);
6364 }
6365 PathStatus::Backup => {
6366 let frame = frame::PathStatusBackup {
6367 path_id,
6368 status_seq_no: seq,
6369 };
6370 builder.write_frame(frame, stats);
6371 }
6372 }
6373 }
6374
6375 if space_id == SpaceId::Data
6377 && !scheduling_info.is_abandoned
6378 && scheduling_info.may_send_data
6379 && space.pending.max_path_id
6380 && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
6381 {
6382 let frame = frame::MaxPathId(self.local_max_path_id);
6383 builder.write_frame(frame, stats);
6384 space.pending.max_path_id = false;
6385 }
6386
6387 if space_id == SpaceId::Data
6389 && !scheduling_info.is_abandoned
6390 && scheduling_info.may_send_data
6391 && space.pending.paths_blocked
6392 && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
6393 {
6394 let frame = frame::PathsBlocked(self.remote_max_path_id);
6395 builder.write_frame(frame, stats);
6396 space.pending.paths_blocked = false;
6397 }
6398
6399 while space_id == SpaceId::Data
6401 && !scheduling_info.is_abandoned
6402 && scheduling_info.may_send_data
6403 && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
6404 {
6405 let Some(path_id) = space.pending.path_cids_blocked.pop_first() else {
6406 break;
6407 };
6408 let next_seq = match self.remote_cids.get(&path_id) {
6409 Some(cid_queue) => VarInt(cid_queue.active_seq() + 1),
6410 None => VarInt(0),
6411 };
6412 let frame = frame::PathCidsBlocked { path_id, next_seq };
6413 builder.write_frame(frame, stats);
6414 }
6415
6416 if space_id == SpaceId::Data
6418 && !scheduling_info.is_abandoned
6419 && scheduling_info.may_send_data
6420 {
6421 self.streams
6422 .write_control_frames(builder, &mut space.pending, stats);
6423 }
6424
6425 let cid_len = self
6427 .local_cid_state
6428 .values()
6429 .map(|cid_state| cid_state.cid_len())
6430 .max()
6431 .expect("some local CID state must exist");
6432 let new_cid_size_bound =
6433 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
6434 while !scheduling_info.is_abandoned
6435 && scheduling_info.may_send_data
6436 && builder.frame_space_remaining() > new_cid_size_bound
6437 {
6438 let Some(issued) = space.pending.new_cids.pop() else {
6439 break;
6440 };
6441 let Some(cid_state) = self.local_cid_state.get(&issued.path_id) else {
6443 debug!(
6444 path = %issued.path_id, seq = issued.sequence,
6445 "dropping queued NEW_CONNECTION_ID for discarded path",
6446 );
6447 continue;
6448 };
6449 let retire_prior_to = cid_state.retire_prior_to();
6450
6451 let cid_path_id = match is_multipath_negotiated {
6452 true => Some(issued.path_id),
6453 false => {
6454 debug_assert_eq!(issued.path_id, PathId::ZERO);
6455 None
6456 }
6457 };
6458 let frame = frame::NewConnectionId {
6459 path_id: cid_path_id,
6460 sequence: issued.sequence,
6461 retire_prior_to,
6462 id: issued.id,
6463 reset_token: issued.reset_token,
6464 };
6465 builder.write_frame(frame, stats);
6466 }
6467
6468 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
6470 while !scheduling_info.is_abandoned
6471 && scheduling_info.may_send_data
6472 && builder.frame_space_remaining() > retire_cid_bound
6473 {
6474 let (path_id, sequence) = match space.pending.retire_cids.pop() {
6475 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => (None, seq),
6476 Some((path_id, seq)) => (Some(path_id), seq),
6477 None => break,
6478 };
6479 let frame = frame::RetireConnectionId { path_id, sequence };
6480 builder.write_frame(frame, stats);
6481 }
6482
6483 let mut sent_datagrams = false;
6485 while !scheduling_info.is_abandoned
6486 && scheduling_info.may_send_data
6487 && builder.frame_space_remaining() > Datagram::SIZE_BOUND
6488 && space_id == SpaceId::Data
6489 {
6490 match self.datagrams.write(builder, stats) {
6491 true => {
6492 sent_datagrams = true;
6493 }
6494 false => break,
6495 }
6496 }
6497 if self.datagrams.send_blocked && sent_datagrams {
6498 self.events.push_back(Event::DatagramsUnblocked);
6499 self.datagrams.send_blocked = false;
6500 }
6501
6502 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
6503
6504 if !scheduling_info.is_abandoned && scheduling_info.may_send_data {
6506 while let Some(network_path) = space.pending.new_tokens.pop() {
6507 debug_assert_eq!(space_id, SpaceId::Data);
6508 let ConnectionSide::Server { server_config } = &self.side else {
6509 panic!("NEW_TOKEN frames should not be enqueued by clients");
6510 };
6511
6512 if !network_path.is_probably_same_path(&path.network_path) {
6513 continue;
6518 }
6519
6520 let token = Token::new(
6521 TokenPayload::Validation {
6522 ip: network_path.remote.ip(),
6523 issued: server_config.time_source.now(),
6524 },
6525 &mut self.rng,
6526 );
6527 let new_token = NewToken {
6528 token: token.encode(&*server_config.token_key).into(),
6529 };
6530
6531 if builder.frame_space_remaining() < new_token.size() {
6532 space.pending.new_tokens.push(network_path);
6533 break;
6534 }
6535
6536 builder.write_frame(new_token, stats);
6537 builder.retransmits_mut().new_tokens.push(network_path);
6538 }
6539 }
6540
6541 while space_id == SpaceId::Data
6543 && !scheduling_info.is_abandoned
6544 && scheduling_info.may_send_data
6545 && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
6546 {
6547 if let Some(added_address) = space.pending.add_address.pop_last() {
6548 builder.write_frame(added_address, stats);
6549 } else {
6550 break;
6551 }
6552 }
6553
6554 while space_id == SpaceId::Data
6556 && !scheduling_info.is_abandoned
6557 && scheduling_info.may_send_data
6558 && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
6559 {
6560 if let Some(removed_address) = space.pending.remove_address.pop_last() {
6561 builder.write_frame(removed_address, stats);
6562 } else {
6563 break;
6564 }
6565 }
6566
6567 if !scheduling_info.is_abandoned
6569 && scheduling_info.may_send_data
6570 && space_id == SpaceId::Data
6571 {
6572 self.streams
6573 .write_stream_frames(builder, self.config.send_fairness, stats);
6574 }
6575 }
6576
6577 fn populate_acks<'a, 'b>(
6579 now: Instant,
6580 receiving_ecn: bool,
6581 path_id: PathId,
6582 space_id: SpaceId,
6583 space: &mut PacketSpace,
6584 is_multipath_negotiated: bool,
6585 builder: &mut PacketBuilder<'a, 'b>,
6586 stats: &mut FrameStats,
6587 space_has_keys: bool,
6588 ) {
6589 debug_assert!(space_has_keys, "tried to send ACK in 0-RTT");
6591
6592 debug_assert!(
6593 is_multipath_negotiated || path_id == PathId::ZERO,
6594 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
6595 );
6596 if is_multipath_negotiated {
6597 debug_assert!(
6598 space_id == SpaceId::Data || path_id == PathId::ZERO,
6599 "path acks must be sent in 1RTT space (have {space_id:?})"
6600 );
6601 }
6602
6603 let pns = space.for_path(path_id);
6604 let ranges = pns.pending_acks.ranges();
6605 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
6606 let ecn = if receiving_ecn {
6607 Some(&pns.ecn_counters)
6608 } else {
6609 None
6610 };
6611
6612 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
6613 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
6615 let delay = delay_micros >> ack_delay_exp.into_inner();
6616
6617 if is_multipath_negotiated && space_id == SpaceId::Data {
6618 if !ranges.is_empty() {
6619 let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
6620 builder.write_frame(frame, stats);
6621 }
6622 } else {
6623 builder.write_frame(frame::Ack::encoder(delay, ranges, ecn), stats);
6624 }
6625 }
6626
6627 fn close_common(&mut self) {
6628 trace!("connection closed");
6629 self.timers.reset();
6630 }
6631
6632 fn set_close_timer(&mut self, now: Instant) {
6633 let pto_max = self.max_pto_for_space(self.highest_space);
6636 self.timers.set(
6637 Timer::Conn(ConnTimer::Close),
6638 now + 3 * pto_max,
6639 self.qlog.with_time(now),
6640 );
6641 }
6642
6643 fn handle_peer_params(
6648 &mut self,
6649 params: TransportParameters,
6650 local_cid: ConnectionId,
6651 remote_cid: ConnectionId,
6652 now: Instant,
6653 ) -> Result<(), TransportError> {
6654 if Some(self.original_remote_cid) != params.initial_src_cid
6655 || (self.side.is_client()
6656 && (Some(self.initial_dst_cid) != params.original_dst_cid
6657 || self.retry_src_cid != params.retry_src_cid))
6658 {
6659 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
6660 "CID authentication failure",
6661 ));
6662 }
6663 if params.initial_max_path_id.is_some() && (local_cid.is_empty() || remote_cid.is_empty()) {
6664 return Err(TransportError::PROTOCOL_VIOLATION(
6665 "multipath must not use zero-length CIDs",
6666 ));
6667 }
6668
6669 self.set_peer_params(params);
6670 self.qlog.emit_peer_transport_params_received(self, now);
6671
6672 Ok(())
6673 }
6674
6675 fn set_peer_params(&mut self, params: TransportParameters) {
6676 self.streams.set_params(¶ms);
6677 self.idle_timeout =
6678 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
6679 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
6680
6681 if let Some(ref info) = params.preferred_address {
6682 self.remote_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
6684 path_id: None,
6685 sequence: 1,
6686 id: info.connection_id,
6687 reset_token: info.stateless_reset_token,
6688 retire_prior_to: 0,
6689 })
6690 .expect(
6691 "preferred address CID is the first received, and hence is guaranteed to be legal",
6692 );
6693 let remote = self.path_data(PathId::ZERO).network_path.remote;
6694 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
6695 }
6696 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
6697
6698 let mut multipath_enabled = false;
6699 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
6700 self.config.get_initial_max_path_id(),
6701 params.initial_max_path_id,
6702 ) {
6703 self.local_max_path_id = local_max_path_id;
6705 self.remote_max_path_id = remote_max_path_id;
6706 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
6707 debug!(%initial_max_path_id, "multipath negotiated");
6708 multipath_enabled = true;
6709 }
6710
6711 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
6712 self.config
6713 .max_remote_nat_traversal_addresses
6714 .zip(params.max_remote_nat_traversal_addresses)
6715 {
6716 if multipath_enabled {
6717 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
6718 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
6719 self.n0_nat_traversal = n0_nat_traversal::State::new(
6720 max_remote_addresses,
6721 max_local_addresses,
6722 self.side(),
6723 );
6724 debug!(
6725 %max_remote_addresses, %max_local_addresses,
6726 "n0's nat traversal negotiated"
6727 );
6728 } else {
6729 debug!("n0 nat traversal enabled for both endpoints, but multipath is missing")
6730 }
6731 }
6732
6733 self.peer_params = params;
6734 let peer_max_udp_payload_size =
6735 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
6736 self.path_data_mut(PathId::ZERO)
6737 .mtud
6738 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
6739 }
6740
6741 fn decrypt_packet(
6743 &mut self,
6744 now: Instant,
6745 path_id: PathId,
6746 packet: &mut Packet,
6747 ) -> Result<Option<u64>, Option<TransportError>> {
6748 let result = self
6749 .crypto_state
6750 .decrypt_packet_body(packet, path_id, &self.spaces)?;
6751
6752 let Some(result) = result else {
6753 return Ok(None);
6754 };
6755
6756 if result.outgoing_key_update_acked
6757 && let Some(prev) = self.crypto_state.prev_crypto.as_mut()
6758 {
6759 prev.end_packet = Some((result.packet_number, now));
6760 self.set_key_discard_timer(now, packet.header.space());
6761 }
6762
6763 if result.incoming_key_update {
6764 trace!("key update authenticated");
6765 self.crypto_state
6766 .update_keys(Some((result.packet_number, now)), true);
6767 self.set_key_discard_timer(now, packet.header.space());
6768 }
6769
6770 Ok(Some(result.packet_number))
6771 }
6772
6773 fn peer_supports_ack_frequency(&self) -> bool {
6774 self.peer_params.min_ack_delay.is_some()
6775 }
6776
6777 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
6782 debug_assert_eq!(
6783 self.highest_space,
6784 SpaceKind::Data,
6785 "immediate ack must be written in the data space"
6786 );
6787 self.spaces[SpaceId::Data]
6788 .for_path(path_id)
6789 .immediate_ack_pending = true;
6790 }
6791
6792 #[cfg(test)]
6794 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
6795 let ConnectionEventInner::Datagram(DatagramConnectionEvent {
6796 path_id,
6797 first_decode,
6798 remaining,
6799 ..
6800 }) = &event.0
6801 else {
6802 return None;
6803 };
6804
6805 if remaining.is_some() {
6806 panic!("Packets should never be coalesced in tests");
6807 }
6808
6809 let decrypted_header = self
6810 .crypto_state
6811 .unprotect_header(first_decode.clone(), self.peer_params.stateless_reset_token)?;
6812
6813 let mut packet = decrypted_header.packet?;
6814 self.crypto_state
6815 .decrypt_packet_body(&mut packet, *path_id, &self.spaces)
6816 .ok()?;
6817
6818 Some(packet.payload.to_vec())
6819 }
6820
6821 #[cfg(test)]
6824 pub(crate) fn bytes_in_flight(&self) -> u64 {
6825 self.path_data(PathId::ZERO).in_flight.bytes
6827 }
6828
6829 #[cfg(test)]
6831 pub(crate) fn congestion_window(&self) -> u64 {
6832 let path = self.path_data(PathId::ZERO);
6833 path.congestion
6834 .window()
6835 .saturating_sub(path.in_flight.bytes)
6836 }
6837
6838 #[cfg(test)]
6840 pub(crate) fn is_idle(&self) -> bool {
6841 let current_timers = self.timers.values();
6842 current_timers
6843 .into_iter()
6844 .filter(|(timer, _)| {
6845 !matches!(
6846 timer,
6847 Timer::Conn(ConnTimer::KeepAlive)
6848 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6849 | Timer::Conn(ConnTimer::PushNewCid)
6850 | Timer::Conn(ConnTimer::KeyDiscard)
6851 )
6852 })
6853 .min_by_key(|(_, time)| *time)
6854 .is_none_or(|(timer, _)| {
6855 matches!(
6856 timer,
6857 Timer::Conn(ConnTimer::Idle) | Timer::PerPath(_, PathTimer::PathIdle)
6858 )
6859 })
6860 }
6861
6862 #[cfg(test)]
6864 pub(crate) fn using_ecn(&self) -> bool {
6865 self.path_data(PathId::ZERO).sending_ecn
6866 }
6867
6868 #[cfg(test)]
6870 pub(crate) fn total_recvd(&self) -> u64 {
6871 self.path_data(PathId::ZERO).total_recvd
6872 }
6873
6874 #[cfg(test)]
6875 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6876 self.local_cid_state
6877 .get(&PathId::ZERO)
6878 .unwrap()
6879 .active_seq()
6880 }
6881
6882 #[cfg(test)]
6883 #[track_caller]
6884 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6885 self.local_cid_state
6886 .get(&PathId(path_id))
6887 .unwrap()
6888 .active_seq()
6889 }
6890
6891 #[cfg(test)]
6894 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6895 let n = self
6896 .local_cid_state
6897 .get_mut(&PathId::ZERO)
6898 .unwrap()
6899 .assign_retire_seq(v);
6900 debug_assert!(!self.state.is_drained()); self.endpoint_events
6902 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6903 }
6904
6905 #[cfg(test)]
6907 pub(crate) fn active_remote_cid_seq(&self) -> u64 {
6908 self.remote_cids.get(&PathId::ZERO).unwrap().active_seq()
6909 }
6910
6911 #[cfg(test)]
6913 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6914 self.path_data(path_id).current_mtu()
6915 }
6916
6917 #[cfg(test)]
6919 pub(crate) fn trigger_path_validation(&mut self) {
6920 for path in self.paths.values_mut() {
6921 path.data.pending_on_path_challenge = true;
6922 }
6923 }
6924
6925 #[cfg(test)]
6927 pub fn simulate_protocol_violation(&mut self, now: Instant) {
6928 if !self.state.is_closed() {
6929 self.state
6930 .move_to_closed(TransportError::PROTOCOL_VIOLATION("simulated violation"));
6931 self.close_common();
6932 if !self.state.is_drained() {
6933 self.set_close_timer(now);
6934 }
6935 self.connection_close_pending = true;
6936 }
6937 }
6938
6939 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6950 let space_specific = self.paths.get(&path_id).is_some_and(|path| {
6951 path.data.pending_on_path_challenge || !path.data.path_responses.is_empty()
6952 });
6953
6954 let other = self.streams.can_send_stream_data()
6956 || self
6957 .datagrams
6958 .outgoing
6959 .front()
6960 .is_some_and(|x| x.size(true) <= max_size);
6961
6962 SendableFrames {
6964 acks: false,
6965 close: false,
6966 space_specific,
6967 other,
6968 }
6969 }
6970
6971 fn kill(&mut self, reason: ConnectionError) {
6973 self.close_common();
6974 let was_draining = self.state.move_to_drained(Some(reason));
6975 if !was_draining {
6976 self.endpoint_events.push_back(EndpointEventInner::Draining);
6977 }
6978 self.endpoint_events.push_back(EndpointEventInner::Drained);
6981 }
6982
6983 pub fn current_mtu(&self) -> u16 {
6990 self.paths
6991 .iter()
6992 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6993 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6994 .min()
6995 .unwrap_or(INITIAL_MTU)
6996 }
6997
6998 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
7005 let pn_len = PacketNumber::new(
7006 pn,
7007 self.spaces[SpaceId::Data]
7008 .for_path(path)
7009 .largest_acked_packet_pn
7010 .unwrap_or(0),
7011 )
7012 .len();
7013
7014 1 + self
7016 .remote_cids
7017 .get(&path)
7018 .map(|cids| cids.active().len())
7019 .unwrap_or(20) + pn_len
7021 + self.tag_len_1rtt()
7022 }
7023
7024 fn predict_1rtt_overhead_no_pn(&self) -> usize {
7025 let pn_len = 4;
7026
7027 let cid_len = self
7028 .remote_cids
7029 .values()
7030 .map(|cids| cids.active().len())
7031 .max()
7032 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
7036 }
7037
7038 fn tag_len_1rtt(&self) -> usize {
7039 let packet_crypto = self
7041 .crypto_state
7042 .encryption_keys(SpaceKind::Data, self.side.side())
7043 .map(|(_header, packet, _level)| packet);
7044 packet_crypto.map_or(16, |x| x.tag_len())
7048 }
7049
7050 fn on_path_validated(&mut self, path_id: PathId) {
7052 self.path_data_mut(path_id).validated = true;
7053 let ConnectionSide::Server { server_config } = &self.side else {
7054 return;
7055 };
7056 let network_path = self.path_data(path_id).network_path;
7057 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
7058 new_tokens.clear();
7059 for _ in 0..server_config.validation_token.sent {
7060 new_tokens.push(network_path);
7061 }
7062 }
7063
7064 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
7066 if let Some(path) = self.paths.get_mut(&path_id) {
7067 path.data.status.remote_update(status, status_seq_no);
7068 } else {
7069 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
7070 }
7071 self.events.push_back(
7072 PathEvent::RemoteStatus {
7073 id: path_id,
7074 status,
7075 }
7076 .into(),
7077 );
7078 }
7079
7080 fn max_path_id(&self) -> Option<PathId> {
7089 if self.is_multipath_negotiated() {
7090 Some(self.remote_max_path_id.min(self.local_max_path_id))
7091 } else {
7092 None
7093 }
7094 }
7095
7096 pub(crate) fn is_ipv6(&self) -> bool {
7101 self.paths
7102 .values()
7103 .any(|p| p.data.network_path.remote.is_ipv6())
7104 }
7105
7106 pub fn add_nat_traversal_address(
7108 &mut self,
7109 address: SocketAddr,
7110 ) -> Result<(), n0_nat_traversal::Error> {
7111 if let Some(added) = self.n0_nat_traversal.add_local_address(address)? {
7112 self.spaces[SpaceId::Data].pending.add_address.insert(added);
7113 };
7114 Ok(())
7115 }
7116
7117 pub fn remove_nat_traversal_address(
7121 &mut self,
7122 address: SocketAddr,
7123 ) -> Result<(), n0_nat_traversal::Error> {
7124 if let Some(removed) = self.n0_nat_traversal.remove_local_address(address)? {
7125 self.spaces[SpaceId::Data]
7126 .pending
7127 .remove_address
7128 .insert(removed);
7129 }
7130 Ok(())
7131 }
7132
7133 pub fn get_local_nat_traversal_addresses(
7135 &self,
7136 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
7137 self.n0_nat_traversal.get_local_nat_traversal_addresses()
7138 }
7139
7140 pub fn get_remote_nat_traversal_addresses(
7142 &self,
7143 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
7144 Ok(self
7145 .n0_nat_traversal
7146 .client_side()?
7147 .get_remote_nat_traversal_addresses())
7148 }
7149
7150 pub fn initiate_nat_traversal_round(
7162 &mut self,
7163 now: Instant,
7164 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
7165 if self.state.is_closed() {
7166 return Err(n0_nat_traversal::Error::Closed);
7167 }
7168
7169 let ipv6 = self.is_ipv6();
7170 let client_state = self.n0_nat_traversal.client_side_mut()?;
7171 let (mut reach_out_frames, probed_addrs) =
7172 client_state.initiate_nat_traversal_round(ipv6)?;
7173 if let Some(delay) = self.n0_nat_traversal.retry_delay(self.config.initial_rtt) {
7174 self.timers.set(
7175 Timer::Conn(ConnTimer::NatTraversalProbeRetry),
7176 now + delay,
7177 self.qlog.with_time(now),
7178 );
7179 }
7180
7181 self.spaces[SpaceId::Data]
7182 .pending
7183 .reach_out
7184 .append(&mut reach_out_frames);
7185
7186 Ok(probed_addrs)
7187 }
7188
7189 fn is_handshake_confirmed(&self) -> bool {
7198 !self.is_handshaking() && !self.crypto_state.has_keys(EncryptionLevel::Handshake)
7199 }
7200}
7201
7202impl fmt::Debug for Connection {
7203 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
7204 f.debug_struct("Connection")
7205 .field("handshake_cid", &self.handshake_cid)
7206 .finish()
7207 }
7208}
7209
7210pub trait NetworkChangeHint: std::fmt::Debug + 'static {
7212 fn is_path_recoverable(&self, path_id: PathId, network_path: FourTuple) -> bool;
7221}
7222
7223#[derive(Debug)]
7225enum PollPathSpaceStatus {
7226 NothingToSend {
7228 congestion_blocked: bool,
7230 },
7231 WrotePacket {
7233 last_packet_number: u64,
7235 pad_datagram: PadDatagram,
7249 },
7250 Send {
7257 last_packet_number: u64,
7259 },
7260}
7261
7262#[derive(Debug, Copy, Clone)]
7268struct PathSchedulingInfo {
7269 is_abandoned: bool,
7275 may_send_data: bool,
7293 may_send_close: bool,
7299 may_self_abandon: bool,
7300}
7301
7302#[derive(Debug, Copy, Clone, PartialEq, Eq)]
7303enum PathBlocked {
7304 No,
7305 AntiAmplification,
7306 Congestion,
7307 Pacing,
7308}
7309
7310enum ConnectionSide {
7312 Client {
7313 token: Bytes,
7315 token_store: Arc<dyn TokenStore>,
7316 server_name: String,
7317 },
7318 Server {
7319 server_config: Arc<ServerConfig>,
7320 },
7321}
7322
7323impl ConnectionSide {
7324 fn is_client(&self) -> bool {
7325 self.side().is_client()
7326 }
7327
7328 fn is_server(&self) -> bool {
7329 self.side().is_server()
7330 }
7331
7332 fn side(&self) -> Side {
7333 match *self {
7334 Self::Client { .. } => Side::Client,
7335 Self::Server { .. } => Side::Server,
7336 }
7337 }
7338}
7339
7340impl From<SideArgs> for ConnectionSide {
7341 fn from(side: SideArgs) -> Self {
7342 match side {
7343 SideArgs::Client {
7344 token_store,
7345 server_name,
7346 } => Self::Client {
7347 token: token_store.take(&server_name).unwrap_or_default(),
7348 token_store,
7349 server_name,
7350 },
7351 SideArgs::Server {
7352 server_config,
7353 pref_addr_cid: _,
7354 path_validated: _,
7355 } => Self::Server { server_config },
7356 }
7357 }
7358}
7359
7360pub(crate) enum SideArgs {
7362 Client {
7363 token_store: Arc<dyn TokenStore>,
7364 server_name: String,
7365 },
7366 Server {
7367 server_config: Arc<ServerConfig>,
7368 pref_addr_cid: Option<ConnectionId>,
7369 path_validated: bool,
7370 },
7371}
7372
7373impl SideArgs {
7374 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
7375 match *self {
7376 Self::Client { .. } => None,
7377 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
7378 }
7379 }
7380
7381 pub(crate) fn path_validated(&self) -> bool {
7382 match *self {
7383 Self::Client { .. } => true,
7384 Self::Server { path_validated, .. } => path_validated,
7385 }
7386 }
7387
7388 pub(crate) fn side(&self) -> Side {
7389 match *self {
7390 Self::Client { .. } => Side::Client,
7391 Self::Server { .. } => Side::Server,
7392 }
7393 }
7394}
7395
7396#[derive(Debug, Error, Clone, PartialEq, Eq)]
7398pub enum ConnectionError {
7399 #[error("peer doesn't implement any supported version")]
7401 VersionMismatch,
7402 #[error(transparent)]
7404 TransportError(#[from] TransportError),
7405 #[error("aborted by peer: {0}")]
7407 ConnectionClosed(frame::ConnectionClose),
7408 #[error("closed by peer: {0}")]
7410 ApplicationClosed(frame::ApplicationClose),
7411 #[error("reset by peer")]
7413 Reset,
7414 #[error("timed out")]
7420 TimedOut,
7421 #[error("closed")]
7423 LocallyClosed,
7424 #[error("CIDs exhausted")]
7428 CidsExhausted,
7429}
7430
7431impl From<Close> for ConnectionError {
7432 fn from(x: Close) -> Self {
7433 match x {
7434 Close::Connection(reason) => Self::ConnectionClosed(reason),
7435 Close::Application(reason) => Self::ApplicationClosed(reason),
7436 }
7437 }
7438}
7439
7440impl From<ConnectionError> for io::Error {
7442 fn from(x: ConnectionError) -> Self {
7443 use ConnectionError::*;
7444 let kind = match x {
7445 TimedOut => io::ErrorKind::TimedOut,
7446 Reset => io::ErrorKind::ConnectionReset,
7447 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
7448 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
7449 io::ErrorKind::Other
7450 }
7451 };
7452 Self::new(kind, x)
7453 }
7454}
7455
7456#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
7459pub enum PathError {
7460 #[error("multipath extension not negotiated")]
7462 MultipathNotNegotiated,
7463 #[error("the server side may not open a path")]
7465 ServerSideNotAllowed,
7466 #[error("maximum number of concurrent paths reached")]
7468 MaxPathIdReached,
7469 #[error("remoted CIDs exhausted")]
7471 RemoteCidsExhausted,
7472 #[error("path validation failed")]
7474 ValidationFailed,
7475 #[error("invalid remote address")]
7477 InvalidRemoteAddress(SocketAddr),
7478}
7479
7480#[derive(Debug, Error, Clone, Eq, PartialEq)]
7482pub enum ClosePathError {
7483 #[error("Multipath extension not negotiated")]
7485 MultipathNotNegotiated,
7486 #[error("closed path")]
7488 ClosedPath,
7489 #[error("last open path")]
7493 LastOpenPath,
7494}
7495
7496#[derive(Debug, Error, Clone, Copy)]
7498#[error("Multipath extension not negotiated")]
7499pub struct MultipathNotNegotiated {
7500 _private: (),
7501}
7502
7503#[derive(Debug)]
7505pub enum Event {
7506 HandshakeDataReady,
7508 Connected,
7510 HandshakeConfirmed,
7512 ConnectionLost {
7519 reason: ConnectionError,
7521 },
7522 Stream(StreamEvent),
7524 DatagramReceived,
7526 DatagramsUnblocked,
7528 Path(PathEvent),
7530 NatTraversal(n0_nat_traversal::Event),
7532}
7533
7534impl From<PathEvent> for Event {
7535 fn from(source: PathEvent) -> Self {
7536 Self::Path(source)
7537 }
7538}
7539
7540fn get_max_ack_delay(params: &TransportParameters) -> Duration {
7541 Duration::from_micros(params.max_ack_delay.0 * 1000)
7542}
7543
7544const MAX_BACKOFF_EXPONENT: u32 = 16;
7546
7547const MAX_PTO_INTERVAL: Duration = Duration::from_secs(2);
7551
7552const MIN_IDLE_FOR_FAST_PTO: Duration = Duration::from_secs(25);
7554
7555const MAX_PTO_FAST_INTERVAL: Duration = Duration::from_secs(1);
7560
7561const SLOW_RTT_THRESHOLD: Duration =
7566 Duration::from_millis((MAX_PTO_INTERVAL.as_millis() as u64 * 2) / 3);
7567
7568const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
7576
7577const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
7583 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
7584
7585#[derive(Default)]
7586struct SentFrames {
7587 retransmits: ThinRetransmits,
7588 largest_acked: FxHashMap<PathId, u64>,
7590 stream_frames: StreamMetaVec,
7591 non_retransmits: bool,
7593 requires_padding: bool,
7595}
7596
7597impl SentFrames {
7598 fn is_ack_only(&self, streams: &StreamsState) -> bool {
7600 !self.largest_acked.is_empty()
7601 && !self.non_retransmits
7602 && self.stream_frames.is_empty()
7603 && self.retransmits.is_empty(streams)
7604 }
7605
7606 fn retransmits_mut(&mut self) -> &mut Retransmits {
7607 self.retransmits.get_or_create()
7608 }
7609
7610 fn record_sent_frame(&mut self, frame: frame::EncodableFrame<'_>) {
7611 use frame::EncodableFrame::*;
7612 match frame {
7613 PathAck(path_ack_encoder) => {
7614 if let Some(max) = path_ack_encoder.ranges.max() {
7615 self.largest_acked.insert(path_ack_encoder.path_id, max);
7616 }
7617 }
7618 Ack(ack_encoder) => {
7619 if let Some(max) = ack_encoder.ranges.max() {
7620 self.largest_acked.insert(PathId::ZERO, max);
7621 }
7622 }
7623 Close(_) => { }
7624 PathResponse(_) => self.non_retransmits = true,
7625 HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
7626 ReachOut(frame) => self.retransmits_mut().reach_out.push(frame),
7627 ObservedAddr(_) => self.retransmits_mut().observed_addr = true,
7628 Ping(_) => self.non_retransmits = true,
7629 ImmediateAck(_) => self.non_retransmits = true,
7630 AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
7631 PathChallenge(_) => self.non_retransmits = true,
7632 Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
7633 PathAbandon(path_abandon) => {
7634 self.retransmits_mut()
7635 .path_abandon
7636 .entry(path_abandon.path_id)
7637 .or_insert(path_abandon.error_code);
7638 }
7639 PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
7640 | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
7641 self.retransmits_mut().path_status.insert(path_id);
7642 }
7643 MaxPathId(_) => self.retransmits_mut().max_path_id = true,
7644 PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
7645 PathCidsBlocked(path_cids_blocked) => {
7646 self.retransmits_mut()
7647 .path_cids_blocked
7648 .insert(path_cids_blocked.path_id);
7649 }
7650 ResetStream(reset) => self
7651 .retransmits_mut()
7652 .reset_stream
7653 .push((reset.id, reset.error_code)),
7654 StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
7655 NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
7656 RetireConnectionId(retire_cid) => self
7657 .retransmits_mut()
7658 .retire_cids
7659 .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
7660 Datagram(_) => self.non_retransmits = true,
7661 NewToken(_) => {}
7662 AddAddress(add_address) => {
7663 self.retransmits_mut().add_address.insert(add_address);
7664 }
7665 RemoveAddress(remove_address) => {
7666 self.retransmits_mut().remove_address.insert(remove_address);
7667 }
7668 StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
7669 MaxData(_) => self.retransmits_mut().max_data = true,
7670 MaxStreamData(max) => {
7671 self.retransmits_mut().max_stream_data.insert(max.id);
7672 }
7673 MaxStreams(max_streams) => {
7674 self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
7675 }
7676 StreamsBlocked(streams_blocked) => {
7677 self.retransmits_mut().streams_blocked[streams_blocked.dir as usize] = true
7678 }
7679 }
7680 }
7681}
7682
7683fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
7691 match (x, y) {
7692 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
7693 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
7694 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
7695 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
7696 }
7697}
7698
7699#[cfg(test)]
7700mod tests {
7701 use super::*;
7702
7703 #[test]
7704 fn negotiate_max_idle_timeout_commutative() {
7705 let test_params = [
7706 (None, None, None),
7707 (None, Some(VarInt(0)), None),
7708 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
7709 (Some(VarInt(0)), Some(VarInt(0)), None),
7710 (
7711 Some(VarInt(2)),
7712 Some(VarInt(0)),
7713 Some(Duration::from_millis(2)),
7714 ),
7715 (
7716 Some(VarInt(1)),
7717 Some(VarInt(4)),
7718 Some(Duration::from_millis(1)),
7719 ),
7720 ];
7721
7722 for (left, right, result) in test_params {
7723 assert_eq!(negotiate_max_idle_timeout(left, right), result);
7724 assert_eq!(negotiate_max_idle_timeout(right, left), result);
7725 }
7726 }
7727}