1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 num::{NonZeroU32, NonZeroUsize},
8 sync::Arc,
9};
10
11use bytes::{Bytes, BytesMut};
12use frame::StreamMetaVec;
13
14use rand::{RngExt, SeedableRng, rngs::StdRng};
15use rustc_hash::{FxHashMap, FxHashSet};
16use thiserror::Error;
17use tracing::{debug, error, trace, trace_span, warn};
18
19use crate::{
20 Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
21 MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
22 TransportError, TransportErrorCode, VarInt,
23 cid_generator::ConnectionIdGenerator,
24 cid_queue::CidQueue,
25 config::{ServerConfig, TransportConfig},
26 congestion::Controller,
27 connection::{
28 qlog::{QlogRecvPacket, QlogSink},
29 spaces::LostPacket,
30 stats::PathStatsMap,
31 timer::{ConnTimer, PathTimer},
32 },
33 crypto::{self, Keys},
34 frame::{
35 self, Close, DataBlocked, Datagram, FrameStruct, NewToken, ObservedAddr, StreamDataBlocked,
36 StreamsBlocked,
37 },
38 n0_nat_traversal,
39 packet::{
40 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
41 PacketNumber, PartialDecode, SpaceId,
42 },
43 range_set::ArrayRangeSet,
44 shared::{
45 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
46 EndpointEvent, EndpointEventInner,
47 },
48 token::{ResetToken, Token, TokenPayload},
49 transport_parameters::TransportParameters,
50};
51
52mod ack_frequency;
53use ack_frequency::AckFrequencyState;
54
55mod assembler;
56pub use assembler::Chunk;
57
58mod cid_state;
59use cid_state::CidState;
60
61mod datagrams;
62use datagrams::DatagramState;
63pub use datagrams::{Datagrams, SendDatagramError};
64
65mod mtud;
66mod pacing;
67
68mod packet_builder;
69use packet_builder::{PacketBuilder, PadDatagram};
70
71mod packet_crypto;
72use packet_crypto::CryptoState;
73pub(crate) use packet_crypto::EncryptionLevel;
74
75mod paths;
76pub use paths::{
77 ClosedPath, PathAbandonReason, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError,
78};
79use paths::{PathData, PathState};
80
81pub(crate) mod qlog;
82pub(crate) mod send_buffer;
83
84mod spaces;
85#[cfg(fuzzing)]
86pub use spaces::Retransmits;
87#[cfg(not(fuzzing))]
88use spaces::Retransmits;
89pub(crate) use spaces::SpaceKind;
90use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
91
92mod stats;
93pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
94
95mod streams;
96#[cfg(fuzzing)]
97pub use streams::StreamsState;
98#[cfg(not(fuzzing))]
99use streams::StreamsState;
100pub use streams::{
101 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
102 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
103};
104
105mod timer;
106use timer::{Timer, TimerTable};
107
108mod transmit_buf;
109use transmit_buf::TransmitBuf;
110
111mod state;
112
113#[cfg(not(fuzzing))]
114use state::State;
115#[cfg(fuzzing)]
116pub use state::State;
117use state::StateType;
118
119pub struct Connection {
159 endpoint_config: Arc<EndpointConfig>,
160 config: Arc<TransportConfig>,
161 rng: StdRng,
162 crypto_state: CryptoState,
164 handshake_cid: ConnectionId,
166 remote_handshake_cid: ConnectionId,
168 paths: BTreeMap<PathId, PathState>,
174 path_generation_counter: u64,
185 allow_mtud: bool,
187 state: State,
188 side: ConnectionSide,
189 peer_params: TransportParameters,
191 original_remote_cid: ConnectionId,
193 initial_dst_cid: ConnectionId,
195 retry_src_cid: Option<ConnectionId>,
198 events: VecDeque<Event>,
200 endpoint_events: VecDeque<EndpointEventInner>,
201 spin_enabled: bool,
203 spin: bool,
205 spaces: [PacketSpace; 3],
207 highest_space: SpaceKind,
209 idle_timeout: Option<Duration>,
211 timers: TimerTable,
212 authentication_failures: u64,
214
215 connection_close_pending: bool,
220
221 ack_frequency: AckFrequencyState,
225
226 receiving_ecn: bool,
231 total_authed_packets: u64,
233
234 next_observed_addr_seq_no: VarInt,
239
240 streams: StreamsState,
241 remote_cids: FxHashMap<PathId, CidQueue>,
247 local_cid_state: FxHashMap<PathId, CidState>,
254 datagrams: DatagramState,
256 path_stats: PathStatsMap,
258 partial_stats: ConnectionStats,
264 version: u32,
266
267 max_concurrent_paths: NonZeroU32,
276 local_max_path_id: PathId,
291 remote_max_path_id: PathId,
297 max_path_id_with_cids: PathId,
303 abandoned_paths: FxHashSet<PathId>,
311
312 n0_nat_traversal: n0_nat_traversal::State,
314 qlog: QlogSink,
315}
316
317impl Connection {
318 pub(crate) fn new(
319 endpoint_config: Arc<EndpointConfig>,
320 config: Arc<TransportConfig>,
321 init_cid: ConnectionId,
322 local_cid: ConnectionId,
323 remote_cid: ConnectionId,
324 network_path: FourTuple,
325 crypto: Box<dyn crypto::Session>,
326 cid_gen: &dyn ConnectionIdGenerator,
327 now: Instant,
328 version: u32,
329 allow_mtud: bool,
330 rng_seed: [u8; 32],
331 side_args: SideArgs,
332 qlog: QlogSink,
333 ) -> Self {
334 let pref_addr_cid = side_args.pref_addr_cid();
335 let path_validated = side_args.path_validated();
336 let connection_side = ConnectionSide::from(side_args);
337 let side = connection_side.side();
338 let mut rng = StdRng::from_seed(rng_seed);
339 let initial_space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
340 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
341 #[cfg(test)]
342 let data_space = match config.deterministic_packet_numbers {
343 true => PacketSpace::new_deterministic(now, SpaceId::Data),
344 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
345 };
346 #[cfg(not(test))]
347 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
348 let state = State::handshake(state::Handshake {
349 remote_cid_set: side.is_server(),
350 expected_token: Bytes::new(),
351 client_hello: None,
352 allow_server_migration: side.is_client(),
353 });
354 let local_cid_state = FxHashMap::from_iter([(
355 PathId::ZERO,
356 CidState::new(
357 cid_gen.cid_len(),
358 cid_gen.cid_lifetime(),
359 now,
360 if pref_addr_cid.is_some() { 2 } else { 1 },
361 ),
362 )]);
363
364 let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
365 path.open_status = paths::OpenStatus::Informed;
366 let mut this = Self {
367 endpoint_config,
368 crypto_state: CryptoState::new(crypto, init_cid, side, &mut rng),
369 handshake_cid: local_cid,
370 remote_handshake_cid: remote_cid,
371 local_cid_state,
372 paths: BTreeMap::from_iter([(
373 PathId::ZERO,
374 PathState {
375 data: path,
376 prev: None,
377 },
378 )]),
379 path_generation_counter: 0,
380 allow_mtud,
381 state,
382 side: connection_side,
383 peer_params: TransportParameters::default(),
384 original_remote_cid: remote_cid,
385 initial_dst_cid: init_cid,
386 retry_src_cid: None,
387 events: VecDeque::new(),
388 endpoint_events: VecDeque::new(),
389 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
390 spin: false,
391 spaces: [initial_space, handshake_space, data_space],
392 highest_space: SpaceKind::Initial,
393 idle_timeout: match config.max_idle_timeout {
394 None | Some(VarInt(0)) => None,
395 Some(dur) => Some(Duration::from_millis(dur.0)),
396 },
397 timers: TimerTable::default(),
398 authentication_failures: 0,
399 connection_close_pending: false,
400
401 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
402 &TransportParameters::default(),
403 )),
404
405 receiving_ecn: false,
406 total_authed_packets: 0,
407
408 next_observed_addr_seq_no: 0u32.into(),
409
410 streams: StreamsState::new(
411 side,
412 config.max_concurrent_uni_streams,
413 config.max_concurrent_bidi_streams,
414 config.send_window,
415 config.receive_window,
416 config.stream_receive_window,
417 ),
418 datagrams: DatagramState::default(),
419 config,
420 remote_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(remote_cid))]),
421 rng,
422 path_stats: Default::default(),
423 partial_stats: ConnectionStats::default(),
424 version,
425
426 max_concurrent_paths: NonZeroU32::MIN,
428 local_max_path_id: PathId::ZERO,
429 remote_max_path_id: PathId::ZERO,
430 max_path_id_with_cids: PathId::ZERO,
431 abandoned_paths: Default::default(),
432
433 n0_nat_traversal: Default::default(),
434 qlog,
435 };
436 if path_validated {
437 this.on_path_validated(PathId::ZERO);
438 }
439 if side.is_client() {
440 this.write_crypto();
442 this.init_0rtt(now);
443 }
444 this.qlog
445 .emit_tuple_assigned(PathId::ZERO, network_path, now);
446 this
447 }
448
449 #[must_use]
457 pub fn poll_timeout(&mut self) -> Option<Instant> {
458 self.timers.peek()
459 }
460
461 #[must_use]
467 pub fn poll(&mut self) -> Option<Event> {
468 if let Some(x) = self.events.pop_front() {
469 return Some(x);
470 }
471
472 if let Some(event) = self.streams.poll() {
473 return Some(Event::Stream(event));
474 }
475
476 if let Some(reason) = self.state.take_error() {
477 return Some(Event::ConnectionLost { reason });
478 }
479
480 None
481 }
482
483 #[must_use]
485 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
486 self.endpoint_events.pop_front().map(EndpointEvent)
487 }
488
489 #[must_use]
491 pub fn streams(&mut self) -> Streams<'_> {
492 Streams {
493 state: &mut self.streams,
494 conn_state: &self.state,
495 }
496 }
497
498 #[must_use]
500 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
501 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
502 RecvStream {
503 id,
504 state: &mut self.streams,
505 pending: &mut self.spaces[SpaceId::Data].pending,
506 }
507 }
508
509 #[must_use]
511 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
512 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
513 SendStream {
514 id,
515 state: &mut self.streams,
516 pending: &mut self.spaces[SpaceId::Data].pending,
517 conn_state: &self.state,
518 }
519 }
520
521 pub fn open_path_ensure(
538 &mut self,
539 network_path: FourTuple,
540 initial_status: PathStatus,
541 now: Instant,
542 ) -> Result<(PathId, bool), PathError> {
543 let existing_open_path = self.paths.iter().find(|(id, path)| {
544 network_path.is_probably_same_path(&path.data.network_path)
545 && !self.abandoned_paths.contains(*id)
546 });
547 match existing_open_path {
548 Some((path_id, _state)) => Ok((*path_id, true)),
549 None => Ok((self.open_path(network_path, initial_status, now)?, false)),
550 }
551 }
552
553 pub fn open_path(
558 &mut self,
559 network_path: FourTuple,
560 initial_status: PathStatus,
561 now: Instant,
562 ) -> Result<PathId, PathError> {
563 if !self.is_multipath_negotiated() {
564 return Err(PathError::MultipathNotNegotiated);
565 }
566 if self.side().is_server() {
567 return Err(PathError::ServerSideNotAllowed);
568 }
569
570 let max_abandoned = self.abandoned_paths.iter().max().copied();
571 let max_used = self.paths.keys().last().copied();
572 let path_id = max_abandoned
573 .max(max_used)
574 .unwrap_or(PathId::ZERO)
575 .saturating_add(1u8);
576
577 if Some(path_id) > self.max_path_id() {
578 return Err(PathError::MaxPathIdReached);
579 }
580 if path_id > self.remote_max_path_id {
581 self.spaces[SpaceId::Data].pending.paths_blocked = true;
582 return Err(PathError::MaxPathIdReached);
583 }
584 if self
585 .remote_cids
586 .get(&path_id)
587 .map(CidQueue::active)
588 .is_none()
589 {
590 self.spaces[SpaceId::Data]
591 .pending
592 .path_cids_blocked
593 .insert(path_id);
594 return Err(PathError::RemoteCidsExhausted);
595 }
596
597 let path = self.ensure_path(path_id, network_path, now, None);
598 path.status.local_update(initial_status);
599
600 Ok(path_id)
601 }
602
603 pub fn close_path(
609 &mut self,
610 now: Instant,
611 path_id: PathId,
612 error_code: VarInt,
613 ) -> Result<(), ClosePathError> {
614 self.close_path_inner(
615 now,
616 path_id,
617 PathAbandonReason::ApplicationClosed { error_code },
618 )
619 }
620
621 pub(crate) fn close_path_inner(
626 &mut self,
627 now: Instant,
628 path_id: PathId,
629 reason: PathAbandonReason,
630 ) -> Result<(), ClosePathError> {
631 if self.state.is_drained() {
632 return Ok(());
633 }
634
635 if !self.is_multipath_negotiated() {
636 return Err(ClosePathError::MultipathNotNegotiated);
637 }
638 if self.abandoned_paths.contains(&path_id)
639 || Some(path_id) > self.max_path_id()
640 || !self.paths.contains_key(&path_id)
641 {
642 return Err(ClosePathError::ClosedPath);
643 }
644
645 let is_last_path = !self
646 .paths
647 .keys()
648 .any(|id| *id != path_id && !self.abandoned_paths.contains(id));
649
650 if is_last_path && !reason.is_remote() {
651 return Err(ClosePathError::LastOpenPath);
652 }
653
654 self.abandon_path(now, path_id, reason);
655
656 if is_last_path {
660 let rtt = RttEstimator::new(self.config.initial_rtt);
664 let pto = rtt.pto_base() + self.ack_frequency.max_ack_delay_for_pto();
665 let grace = pto * 3;
666 self.timers.set(
667 Timer::Conn(ConnTimer::NoAvailablePath),
668 now + grace,
669 self.qlog.with_time(now),
670 );
671 }
672
673 Ok(())
674 }
675
676 fn abandon_path(&mut self, now: Instant, path_id: PathId, reason: PathAbandonReason) {
681 trace!(%path_id, ?reason, "abandoning path");
682
683 let pending_space = &mut self.spaces[SpaceId::Data].pending;
684 pending_space
686 .path_abandon
687 .insert(path_id, reason.error_code());
688
689 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
691 pending_space.path_cids_blocked.retain(|&id| id != path_id);
692 pending_space.path_status.retain(|&id| id != path_id);
693
694 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
696 for sent_packet in space.sent_packets.values_mut() {
697 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
698 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
699 retransmits.path_cids_blocked.retain(|&id| id != path_id);
700 retransmits.path_status.retain(|&id| id != path_id);
701 }
702 }
703 }
704
705 self.spaces[SpaceId::Data].for_path(path_id).loss_probes = 0;
710
711 debug_assert!(!self.state.is_drained()); self.endpoint_events
716 .push_back(EndpointEventInner::RetireResetToken(path_id));
717
718 self.abandoned_paths.insert(path_id);
719
720 for timer in timer::PathTimer::VALUES {
721 let keep_timer = match timer {
723 PathTimer::PathValidationFailed
727 | PathTimer::PathChallengeLost
728 | PathTimer::AbandonFromValidation => false,
729 PathTimer::PathKeepAlive | PathTimer::PathIdle => false,
732 PathTimer::MaxAckDelay => false,
735 PathTimer::PathDrained => false,
738 PathTimer::LossDetection => true,
741 PathTimer::Pacing => true,
745 };
746
747 if !keep_timer {
748 let qlog = self.qlog.with_time(now);
749 self.timers.stop(Timer::PerPath(path_id, timer), qlog);
750 }
751 }
752
753 self.set_loss_detection_timer(now, path_id);
758
759 self.events.push_back(Event::Path(PathEvent::Abandoned {
761 id: path_id,
762 reason,
763 }));
764 }
765
766 #[track_caller]
770 fn path_data(&self, path_id: PathId) -> &PathData {
771 if let Some(data) = self.paths.get(&path_id) {
772 &data.data
773 } else {
774 panic!(
775 "unknown path: {path_id}, currently known paths: {:?}",
776 self.paths.keys().collect::<Vec<_>>()
777 );
778 }
779 }
780
781 fn path(&self, path_id: PathId) -> Option<&PathData> {
783 self.paths.get(&path_id).map(|path_state| &path_state.data)
784 }
785
786 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
788 self.paths
789 .get_mut(&path_id)
790 .map(|path_state| &mut path_state.data)
791 }
792
793 pub fn paths(&self) -> Vec<PathId> {
797 self.paths.keys().copied().collect()
798 }
799
800 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
802 self.path(path_id)
803 .map(PathData::local_status)
804 .ok_or(ClosedPath { _private: () })
805 }
806
807 pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
809 self.path(path_id)
810 .map(|path| path.network_path)
811 .ok_or(ClosedPath { _private: () })
812 }
813
814 pub fn set_path_status(
818 &mut self,
819 path_id: PathId,
820 status: PathStatus,
821 ) -> Result<PathStatus, SetPathStatusError> {
822 if !self.is_multipath_negotiated() {
823 return Err(SetPathStatusError::MultipathNotNegotiated);
824 }
825 let path = self
826 .path_mut(path_id)
827 .ok_or(SetPathStatusError::ClosedPath)?;
828 let prev = match path.status.local_update(status) {
829 Some(prev) => {
830 self.spaces[SpaceId::Data]
831 .pending
832 .path_status
833 .insert(path_id);
834 prev
835 }
836 None => path.local_status(),
837 };
838 Ok(prev)
839 }
840
841 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
846 self.path(path_id).and_then(|path| path.remote_status())
847 }
848
849 pub fn set_path_max_idle_timeout(
858 &mut self,
859 now: Instant,
860 path_id: PathId,
861 timeout: Option<Duration>,
862 ) -> Result<Option<Duration>, ClosedPath> {
863 let path = self
864 .paths
865 .get_mut(&path_id)
866 .ok_or(ClosedPath { _private: () })?;
867 let prev = std::mem::replace(&mut path.data.idle_timeout, timeout);
868
869 if !self.state.is_closed() {
871 if let Some(new_timeout) = timeout {
872 let timer = Timer::PerPath(path_id, PathTimer::PathIdle);
873 let deadline = match (prev, self.timers.get(timer)) {
874 (Some(old_timeout), Some(old_deadline)) => {
875 let last_activity = old_deadline.checked_sub(old_timeout).unwrap_or(now);
876 last_activity + new_timeout
877 }
878 _ => now + new_timeout,
879 };
880 self.timers.set(timer, deadline, self.qlog.with_time(now));
881 } else {
882 self.timers.stop(
883 Timer::PerPath(path_id, PathTimer::PathIdle),
884 self.qlog.with_time(now),
885 );
886 }
887 }
888
889 Ok(prev)
890 }
891
892 pub fn set_path_keep_alive_interval(
898 &mut self,
899 path_id: PathId,
900 interval: Option<Duration>,
901 ) -> Result<Option<Duration>, ClosedPath> {
902 let path = self
903 .paths
904 .get_mut(&path_id)
905 .ok_or(ClosedPath { _private: () })?;
906 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
907 }
908
909 #[track_caller]
913 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
914 &mut self.paths.get_mut(&path_id).expect("known path").data
915 }
916
917 fn find_validated_path_on_network_path(
921 &self,
922 network_path: FourTuple,
923 ) -> Option<(&PathId, &PathState)> {
924 self.paths.iter().find(|(path_id, path_state)| {
925 path_state.data.validated
926 && network_path.is_probably_same_path(&path_state.data.network_path)
928 && !self.abandoned_paths.contains(path_id)
929 })
930 }
934
935 fn ensure_path(
939 &mut self,
940 path_id: PathId,
941 network_path: FourTuple,
942 now: Instant,
943 pn: Option<u64>,
944 ) -> &mut PathData {
945 let valid_path = self.find_validated_path_on_network_path(network_path);
946 let validated = valid_path.is_some();
947 let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
948 let vacant_entry = match self.paths.entry(path_id) {
949 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
950 btree_map::Entry::Occupied(occupied_entry) => {
951 return &mut occupied_entry.into_mut().data;
952 }
953 };
954
955 debug!(%validated, %path_id, %network_path, "path added");
956
957 self.timers.stop(
959 Timer::Conn(ConnTimer::NoAvailablePath),
960 self.qlog.with_time(now),
961 );
962 let peer_max_udp_payload_size =
963 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
964 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
965 let mut data = PathData::new(
966 network_path,
967 self.allow_mtud,
968 Some(peer_max_udp_payload_size),
969 self.path_generation_counter,
970 now,
971 &self.config,
972 );
973
974 data.validated = validated;
975 if let Some(initial_rtt) = initial_rtt {
976 data.rtt.reset_initial_rtt(initial_rtt);
977 }
978
979 data.pending_on_path_challenge = true;
982
983 let path = vacant_entry.insert(PathState { data, prev: None });
984
985 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
986 if let Some(pn) = pn {
987 pn_space.dedup.insert(pn);
988 }
989 self.spaces[SpaceId::Data]
990 .number_spaces
991 .insert(path_id, pn_space);
992 self.qlog.emit_tuple_assigned(path_id, network_path, now);
993
994 if !self.remote_cids.contains_key(&path_id) {
998 debug!(%path_id, "Remote opened path without issuing CIDs");
999 self.spaces[SpaceId::Data]
1000 .pending
1001 .path_cids_blocked
1002 .insert(path_id);
1003 }
1006
1007 &mut path.data
1008 }
1009
1010 #[must_use]
1020 pub fn poll_transmit(
1021 &mut self,
1022 now: Instant,
1023 max_datagrams: NonZeroUsize,
1024 buf: &mut Vec<u8>,
1025 ) -> Option<Transmit> {
1026 let max_datagrams = match self.config.enable_segmentation_offload {
1027 false => NonZeroUsize::MIN,
1028 true => max_datagrams,
1029 };
1030
1031 let connection_close_pending = match self.state.as_type() {
1037 StateType::Drained => {
1038 for path in self.paths.values_mut() {
1039 path.data.app_limited = true;
1040 }
1041 return None;
1042 }
1043 StateType::Draining | StateType::Closed => {
1044 if !self.connection_close_pending {
1047 for path in self.paths.values_mut() {
1048 path.data.app_limited = true;
1049 }
1050 return None;
1051 }
1052 true
1053 }
1054 _ => false,
1055 };
1056
1057 if let Some(config) = &self.config.ack_frequency_config {
1059 let rtt = self
1060 .paths
1061 .values()
1062 .map(|p| p.data.rtt.get())
1063 .min()
1064 .expect("one path exists");
1065 self.spaces[SpaceId::Data].pending.ack_frequency = self
1066 .ack_frequency
1067 .should_send_ack_frequency(rtt, config, &self.peer_params)
1068 && self.highest_space == SpaceKind::Data
1069 && self.peer_supports_ack_frequency();
1070 }
1071
1072 let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1073 while let Some(path_id) = next_path_id {
1074 if !connection_close_pending
1075 && let Some(transmit) = self.poll_transmit_off_path(now, buf, path_id)
1076 {
1077 return Some(transmit);
1078 }
1079
1080 let info = self.scheduling_info(path_id);
1081 if let Some(transmit) = self.poll_transmit_on_path(
1082 now,
1083 buf,
1084 path_id,
1085 max_datagrams,
1086 &info,
1087 connection_close_pending,
1088 ) {
1089 return Some(transmit);
1090 }
1091
1092 debug_assert!(
1095 buf.is_empty(),
1096 "nothing to send on path but buffer not empty"
1097 );
1098
1099 next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1100 }
1101
1102 debug_assert!(
1104 buf.is_empty(),
1105 "there was data in the buffer, but it was not sent"
1106 );
1107
1108 if self.state.is_established() {
1109 let mut next_path_id = self.paths.first_entry().map(|e| *e.key());
1111 while let Some(path_id) = next_path_id {
1112 if let Some(transmit) = self.poll_transmit_mtu_probe(now, buf, path_id) {
1113 return Some(transmit);
1114 }
1115 next_path_id = self.paths.keys().find(|i| **i > path_id).copied();
1116 }
1117 }
1118
1119 None
1120 }
1121
1122 fn scheduling_info(&self, path_id: PathId) -> PathSchedulingInfo {
1140 let have_validated_status_available_space = self.paths.iter().any(|(path_id, path)| {
1142 self.remote_cids.contains_key(path_id)
1143 && !self.abandoned_paths.contains(path_id)
1144 && path.data.validated
1145 && path.data.local_status() == PathStatus::Available
1146 });
1147
1148 let have_validated_space = self.paths.iter().any(|(path_id, path)| {
1150 self.remote_cids.contains_key(path_id)
1151 && !self.abandoned_paths.contains(path_id)
1152 && path.data.validated
1153 });
1154
1155 let is_handshaking = self.is_handshaking();
1156 let has_cids = self.remote_cids.contains_key(&path_id);
1157 let is_abandoned = self.abandoned_paths.contains(&path_id);
1158 let path_data = self.path_data(path_id);
1159 let validated = path_data.validated;
1160 let status = path_data.local_status();
1161
1162 let may_send_data = has_cids
1165 && !is_abandoned
1166 && if is_handshaking {
1167 true
1171 } else if !validated {
1172 false
1179 } else {
1180 match status {
1181 PathStatus::Available => {
1182 true
1184 }
1185 PathStatus::Backup => {
1186 !have_validated_status_available_space
1188 }
1189 }
1190 };
1191
1192 let may_send_close = has_cids
1197 && !is_abandoned
1198 && if !validated && have_validated_status_available_space {
1199 false
1201 } else {
1202 true
1204 };
1205
1206 let may_self_abandon = has_cids && validated && !have_validated_space;
1210
1211 PathSchedulingInfo {
1212 is_abandoned,
1213 may_send_data,
1214 may_send_close,
1215 may_self_abandon,
1216 }
1217 }
1218
1219 fn build_transmit(&mut self, path_id: PathId, transmit: TransmitBuf<'_>) -> Transmit {
1220 debug_assert!(
1221 !transmit.is_empty(),
1222 "must not be called with an empty transmit buffer"
1223 );
1224
1225 let network_path = self.path_data(path_id).network_path;
1226 trace!(
1227 segment_size = transmit.segment_size(),
1228 last_datagram_len = transmit.len() % transmit.segment_size(),
1229 %network_path,
1230 "sending {} bytes in {} datagrams",
1231 transmit.len(),
1232 transmit.num_datagrams()
1233 );
1234 self.path_data_mut(path_id)
1235 .inc_total_sent(transmit.len() as u64);
1236
1237 self.path_stats
1238 .for_path(path_id)
1239 .udp_tx
1240 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1241
1242 Transmit {
1243 destination: network_path.remote,
1244 size: transmit.len(),
1245 ecn: if self.path_data(path_id).sending_ecn {
1246 Some(EcnCodepoint::Ect0)
1247 } else {
1248 None
1249 },
1250 segment_size: match transmit.num_datagrams() {
1251 1 => None,
1252 _ => Some(transmit.segment_size()),
1253 },
1254 src_ip: network_path.local_ip,
1255 }
1256 }
1257
1258 fn poll_transmit_off_path(
1260 &mut self,
1261 now: Instant,
1262 buf: &mut Vec<u8>,
1263 path_id: PathId,
1264 ) -> Option<Transmit> {
1265 if let Some(challenge) = self.send_prev_path_challenge(now, buf, path_id) {
1266 return Some(challenge);
1267 }
1268 if let Some(response) = self.send_off_path_path_response(now, buf, path_id) {
1269 return Some(response);
1270 }
1271 if let Some(challenge) = self.send_nat_traversal_path_challenge(now, buf, path_id) {
1272 return Some(challenge);
1273 }
1274 None
1275 }
1276
1277 #[must_use]
1284 fn poll_transmit_on_path(
1285 &mut self,
1286 now: Instant,
1287 buf: &mut Vec<u8>,
1288 path_id: PathId,
1289 max_datagrams: NonZeroUsize,
1290 scheduling_info: &PathSchedulingInfo,
1291 connection_close_pending: bool,
1292 ) -> Option<Transmit> {
1293 let Some(remote_cid) = self.remote_cids.get(&path_id).map(CidQueue::active) else {
1295 if !self.abandoned_paths.contains(&path_id) {
1296 debug!(%path_id, "no remote CIDs for path");
1297 }
1298 return None;
1299 };
1300
1301 let mut pad_datagram = PadDatagram::No;
1307
1308 let mut last_packet_number = None;
1312
1313 let mut congestion_blocked = false;
1316
1317 let pmtu = self.path_data(path_id).current_mtu().into();
1319 let mut transmit = TransmitBuf::new(buf, max_datagrams, pmtu);
1320
1321 for space_id in SpaceId::iter() {
1323 if path_id != PathId::ZERO && space_id != SpaceId::Data {
1325 continue;
1326 }
1327 match self.poll_transmit_path_space(
1328 now,
1329 &mut transmit,
1330 path_id,
1331 space_id,
1332 remote_cid,
1333 scheduling_info,
1334 connection_close_pending,
1335 pad_datagram,
1336 ) {
1337 PollPathSpaceStatus::NothingToSend {
1338 congestion_blocked: cb,
1339 } => {
1340 congestion_blocked |= cb;
1341 }
1344 PollPathSpaceStatus::WrotePacket {
1345 last_packet_number: pn,
1346 pad_datagram: pad,
1347 } => {
1348 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1349 last_packet_number = Some(pn);
1350 pad_datagram = pad;
1351 continue;
1356 }
1357 PollPathSpaceStatus::Send {
1358 last_packet_number: pn,
1359 } => {
1360 debug_assert!(!transmit.is_empty(), "transmit must contain packets");
1361 last_packet_number = Some(pn);
1362 break;
1363 }
1364 }
1365 }
1366
1367 if last_packet_number.is_some() || congestion_blocked {
1368 self.qlog.emit_recovery_metrics(
1369 path_id,
1370 &mut self.paths.get_mut(&path_id).unwrap().data,
1371 now,
1372 );
1373 }
1374
1375 self.path_data_mut(path_id).app_limited =
1376 last_packet_number.is_none() && !congestion_blocked;
1377
1378 match last_packet_number {
1379 Some(last_packet_number) => {
1380 self.path_data_mut(path_id).congestion.on_sent(
1383 now,
1384 transmit.len() as u64,
1385 last_packet_number,
1386 );
1387 Some(self.build_transmit(path_id, transmit))
1388 }
1389 None => None,
1390 }
1391 }
1392
1393 #[must_use]
1395 fn poll_transmit_path_space(
1396 &mut self,
1397 now: Instant,
1398 transmit: &mut TransmitBuf<'_>,
1399 path_id: PathId,
1400 space_id: SpaceId,
1401 remote_cid: ConnectionId,
1402 scheduling_info: &PathSchedulingInfo,
1403 connection_close_pending: bool,
1405 mut pad_datagram: PadDatagram,
1407 ) -> PollPathSpaceStatus {
1408 let mut last_packet_number = None;
1411
1412 loop {
1428 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1430 transmit.datagram_remaining_mut()
1432 } else {
1433 transmit.segment_size()
1435 };
1436 let can_send =
1437 self.space_can_send(space_id, path_id, max_packet_size, connection_close_pending);
1438 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1439 let space_will_send = {
1440 if scheduling_info.is_abandoned {
1441 scheduling_info.may_self_abandon
1446 && self.spaces[space_id]
1447 .pending
1448 .path_abandon
1449 .contains_key(&path_id)
1450 } else if can_send.close && scheduling_info.may_send_close {
1451 true
1453 } else if needs_loss_probe || can_send.space_specific {
1454 true
1457 } else {
1458 !can_send.is_empty() && scheduling_info.may_send_data
1461 }
1462 };
1463
1464 if !space_will_send {
1465 return match last_packet_number {
1468 Some(pn) => PollPathSpaceStatus::WrotePacket {
1469 last_packet_number: pn,
1470 pad_datagram,
1471 },
1472 None => {
1473 if self.crypto_state.has_keys(space_id.encryption_level())
1475 || (space_id == SpaceId::Data
1476 && self.crypto_state.has_keys(EncryptionLevel::ZeroRtt))
1477 {
1478 trace!(?space_id, %path_id, "nothing to send in space");
1479 }
1480 PollPathSpaceStatus::NothingToSend {
1481 congestion_blocked: false,
1482 }
1483 }
1484 };
1485 }
1486
1487 if transmit.datagram_remaining_mut() == 0 {
1491 let congestion_blocked =
1492 self.path_congestion_check(space_id, path_id, transmit, &can_send, now);
1493 if congestion_blocked != PathBlocked::No {
1494 return match last_packet_number {
1496 Some(pn) => PollPathSpaceStatus::WrotePacket {
1497 last_packet_number: pn,
1498 pad_datagram,
1499 },
1500 None => {
1501 return PollPathSpaceStatus::NothingToSend {
1502 congestion_blocked: true,
1503 };
1504 }
1505 };
1506 }
1507
1508 if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1511 return match last_packet_number {
1514 Some(pn) => PollPathSpaceStatus::WrotePacket {
1515 last_packet_number: pn,
1516 pad_datagram,
1517 },
1518 None => {
1519 return PollPathSpaceStatus::NothingToSend {
1520 congestion_blocked: false,
1521 };
1522 }
1523 };
1524 }
1525
1526 if needs_loss_probe {
1527 let request_immediate_ack =
1529 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1530 self.spaces[space_id].queue_tail_loss_probe(
1531 path_id,
1532 request_immediate_ack,
1533 &self.streams,
1534 );
1535
1536 self.spaces[space_id].for_path(path_id).loss_probes -= 1; transmit.start_new_datagram_with_size(std::cmp::min(
1542 usize::from(INITIAL_MTU),
1543 transmit.segment_size(),
1544 ));
1545 } else {
1546 transmit.start_new_datagram();
1547 }
1548 trace!(count = transmit.num_datagrams(), "new datagram started");
1549
1550 pad_datagram = PadDatagram::No;
1552 }
1553
1554 if transmit.datagram_start_offset() < transmit.len() {
1557 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1558 }
1559
1560 if self.crypto_state.has_keys(EncryptionLevel::Initial)
1565 && space_id == SpaceId::Handshake
1566 && self.side.is_client()
1567 {
1568 self.discard_space(now, SpaceKind::Initial);
1571 }
1572 if let Some(ref mut prev) = self.crypto_state.prev_crypto {
1573 prev.update_unacked = false;
1574 }
1575
1576 let Some(mut builder) =
1577 PacketBuilder::new(now, space_id, path_id, remote_cid, transmit, self)
1578 else {
1579 return PollPathSpaceStatus::NothingToSend {
1586 congestion_blocked: false,
1587 };
1588 };
1589 last_packet_number = Some(builder.packet_number);
1590
1591 if space_id == SpaceId::Initial
1592 && (self.side.is_client() || can_send.is_ack_eliciting())
1593 {
1594 pad_datagram |= PadDatagram::ToMinMtu;
1596 }
1597 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1598 pad_datagram |= PadDatagram::ToSegmentSize;
1599 }
1600
1601 if scheduling_info.may_send_close && can_send.close {
1602 trace!("sending CONNECTION_CLOSE");
1603 let is_multipath_negotiated = self.is_multipath_negotiated();
1608 for path_id in self.spaces[space_id]
1609 .number_spaces
1610 .iter()
1611 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1612 .map(|(&path_id, _)| path_id)
1613 .collect::<Vec<_>>()
1614 {
1615 Self::populate_acks(
1616 now,
1617 self.receiving_ecn,
1618 path_id,
1619 space_id,
1620 &mut self.spaces[space_id],
1621 is_multipath_negotiated,
1622 &mut builder,
1623 &mut self.path_stats.for_path(path_id).frame_tx,
1624 self.crypto_state.has_keys(space_id.encryption_level()),
1625 );
1626 }
1627
1628 debug_assert!(
1636 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1637 "ACKs should leave space for ConnectionClose"
1638 );
1639 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
1640 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1641 let max_frame_size = builder.frame_space_remaining();
1642 let close: Close = match self.state.as_type() {
1643 StateType::Closed => {
1644 let reason: Close =
1645 self.state.as_closed().expect("checked").clone().into();
1646 if space_id == SpaceId::Data || reason.is_transport_layer() {
1647 reason
1648 } else {
1649 TransportError::APPLICATION_ERROR("").into()
1650 }
1651 }
1652 StateType::Draining => TransportError::NO_ERROR("").into(),
1653 _ => unreachable!(
1654 "tried to make a close packet when the connection wasn't closed"
1655 ),
1656 };
1657 builder.write_frame(close.encoder(max_frame_size), stats);
1658 }
1659 let last_pn = builder.packet_number;
1660 builder.finish_and_track(now, self, path_id, pad_datagram);
1661 if space_id.kind() == self.highest_space {
1662 self.connection_close_pending = false;
1665 }
1666 return PollPathSpaceStatus::WrotePacket {
1679 last_packet_number: last_pn,
1680 pad_datagram,
1681 };
1682 }
1683
1684 self.populate_packet(now, space_id, path_id, scheduling_info, &mut builder);
1685
1686 debug_assert!(
1693 !(builder.sent_frames().is_ack_only(&self.streams)
1694 && !can_send.acks
1695 && (can_send.other || can_send.space_specific)
1696 && builder.buf.segment_size()
1697 == self.path_data(path_id).current_mtu() as usize
1698 && self.datagrams.outgoing.is_empty()),
1699 "SendableFrames was {can_send:?}, but only ACKs have been written"
1700 );
1701 if builder.sent_frames().requires_padding {
1702 pad_datagram |= PadDatagram::ToMinMtu;
1703 }
1704
1705 for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1706 self.spaces[space_id]
1707 .for_path(*path_id)
1708 .pending_acks
1709 .acks_sent();
1710 self.timers.stop(
1711 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1712 self.qlog.with_time(now),
1713 );
1714 }
1715
1716 if builder.can_coalesce && path_id == PathId::ZERO && {
1724 let max_packet_size = builder
1725 .buf
1726 .datagram_remaining_mut()
1727 .saturating_sub(builder.predict_packet_end());
1728 max_packet_size > MIN_PACKET_SPACE
1729 && self.has_pending_packet(space_id, max_packet_size, connection_close_pending)
1730 } {
1731 trace!("will coalesce with next packet");
1734 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1735 } else {
1736 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1742 const MAX_PADDING: usize = 32;
1750 if builder.buf.datagram_remaining_mut()
1751 > builder.predict_packet_end() + MAX_PADDING
1752 {
1753 trace!(
1754 "GSO truncated by demand for {} padding bytes",
1755 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1756 );
1757 let last_pn = builder.packet_number;
1758 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1759 return PollPathSpaceStatus::Send {
1760 last_packet_number: last_pn,
1761 };
1762 }
1763
1764 builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1767 } else {
1768 builder.finish_and_track(now, self, path_id, pad_datagram);
1769 }
1770
1771 if transmit.num_datagrams() == 1 {
1774 transmit.clip_segment_size();
1775 }
1776 }
1777 }
1778 }
1779
1780 fn poll_transmit_mtu_probe(
1781 &mut self,
1782 now: Instant,
1783 buf: &mut Vec<u8>,
1784 path_id: PathId,
1785 ) -> Option<Transmit> {
1786 let (active_cid, probe_size) = self.get_mtu_probe_data(now, path_id)?;
1787
1788 let mut transmit = TransmitBuf::new(buf, NonZeroUsize::MIN, probe_size as usize);
1790 transmit.start_new_datagram_with_size(probe_size as usize);
1791
1792 let mut builder =
1793 PacketBuilder::new(now, SpaceId::Data, path_id, active_cid, &mut transmit, self)?;
1794
1795 trace!(?probe_size, "writing MTUD probe");
1797 builder.write_frame(frame::Ping, &mut self.path_stats.for_path(path_id).frame_tx);
1798
1799 if self.peer_supports_ack_frequency() {
1801 builder.write_frame(
1802 frame::ImmediateAck,
1803 &mut self.path_stats.for_path(path_id).frame_tx,
1804 );
1805 }
1806
1807 builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1808
1809 self.path_stats.for_path(path_id).sent_plpmtud_probes += 1;
1810
1811 Some(self.build_transmit(path_id, transmit))
1812 }
1813
1814 fn get_mtu_probe_data(&mut self, now: Instant, path_id: PathId) -> Option<(ConnectionId, u16)> {
1822 let active_cid = self.remote_cids.get(&path_id).map(CidQueue::active)?;
1823 let is_eligible = self.path_data(path_id).validated
1824 && !self.path_data(path_id).is_validating_path()
1825 && !self.abandoned_paths.contains(&path_id);
1826
1827 if !is_eligible {
1828 return None;
1829 }
1830 let next_pn = self.spaces[SpaceId::Data]
1831 .for_path(path_id)
1832 .peek_tx_number();
1833 let probe_size = self
1834 .path_data_mut(path_id)
1835 .mtud
1836 .poll_transmit(now, next_pn)?;
1837
1838 Some((active_cid, probe_size))
1839 }
1840
1841 fn has_pending_packet(
1858 &mut self,
1859 current_space_id: SpaceId,
1860 max_packet_size: usize,
1861 connection_close_pending: bool,
1862 ) -> bool {
1863 let mut space_id = current_space_id;
1864 loop {
1865 let can_send = self.space_can_send(
1866 space_id,
1867 PathId::ZERO,
1868 max_packet_size,
1869 connection_close_pending,
1870 );
1871 if !can_send.is_empty() {
1872 return true;
1873 }
1874 match space_id.next() {
1875 Some(next_space_id) => space_id = next_space_id,
1876 None => break,
1877 }
1878 }
1879 false
1880 }
1881
1882 fn path_congestion_check(
1884 &mut self,
1885 space_id: SpaceId,
1886 path_id: PathId,
1887 transmit: &TransmitBuf<'_>,
1888 can_send: &SendableFrames,
1889 now: Instant,
1890 ) -> PathBlocked {
1891 if self.side().is_server()
1897 && self
1898 .path_data(path_id)
1899 .anti_amplification_blocked(transmit.len() as u64 + 1)
1900 {
1901 trace!(?space_id, %path_id, "blocked by anti-amplification");
1902 return PathBlocked::AntiAmplification;
1903 }
1904
1905 let bytes_to_send = transmit.segment_size() as u64;
1908 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1909
1910 if can_send.other && !need_loss_probe && !can_send.close {
1911 let path = self.path_data(path_id);
1912 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1913 trace!(
1914 ?space_id,
1915 %path_id,
1916 in_flight=%path.in_flight.bytes,
1917 congestion_window=%path.congestion.window(),
1918 "blocked by congestion control",
1919 );
1920 return PathBlocked::Congestion;
1921 }
1922 }
1923
1924 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1926 let resume_time = now + delay;
1927 self.timers.set(
1928 Timer::PerPath(path_id, PathTimer::Pacing),
1929 resume_time,
1930 self.qlog.with_time(now),
1931 );
1932 trace!(?space_id, %path_id, ?delay, "blocked by pacing");
1935 return PathBlocked::Pacing;
1936 }
1937
1938 PathBlocked::No
1939 }
1940
1941 fn send_prev_path_challenge(
1946 &mut self,
1947 now: Instant,
1948 buf: &mut Vec<u8>,
1949 path_id: PathId,
1950 ) -> Option<Transmit> {
1951 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1952 if !prev_path.pending_on_path_challenge {
1953 return None;
1954 };
1955 prev_path.pending_on_path_challenge = false;
1956 let token = self.rng.random();
1957 let network_path = prev_path.network_path;
1958 prev_path.record_path_challenge_sent(now, token, network_path);
1959
1960 debug_assert_eq!(
1961 self.highest_space,
1962 SpaceKind::Data,
1963 "PATH_CHALLENGE queued without 1-RTT keys"
1964 );
1965 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
1966 buf.start_new_datagram();
1967
1968 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, self)?;
1974 let challenge = frame::PathChallenge(token);
1975 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
1976 builder.write_frame_with_log_msg(challenge, stats, Some("validating previous path"));
1977
1978 builder.pad_to(MIN_INITIAL_SIZE);
1983
1984 builder.finish(self, now);
1985 self.path_stats
1986 .for_path(path_id)
1987 .udp_tx
1988 .on_sent(1, buf.len());
1989
1990 Some(Transmit {
1991 destination: network_path.remote,
1992 size: buf.len(),
1993 ecn: None,
1994 segment_size: None,
1995 src_ip: network_path.local_ip,
1996 })
1997 }
1998
1999 fn send_off_path_path_response(
2000 &mut self,
2001 now: Instant,
2002 buf: &mut Vec<u8>,
2003 path_id: PathId,
2004 ) -> Option<Transmit> {
2005 let path = self.paths.get_mut(&path_id).map(|state| &mut state.data)?;
2006 let cid_queue = self.remote_cids.get_mut(&path_id)?;
2007 let (token, network_path) = path.path_responses.pop_off_path(path.network_path)?;
2008
2009 let cid = cid_queue
2010 .next_reserved()
2011 .unwrap_or_else(|| cid_queue.active());
2012 let frame = frame::PathResponse(token);
2016
2017 let buf = &mut TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2018 buf.start_new_datagram();
2019
2020 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, buf, self)?;
2021 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
2022 builder.write_frame_with_log_msg(frame, stats, Some("(off-path)"));
2023 builder.pad_to(MIN_INITIAL_SIZE);
2026 builder.finish(self, now);
2027
2028 let size = buf.len();
2029
2030 self.path_stats.for_path(path_id).udp_tx.on_sent(1, size);
2031 Some(Transmit {
2032 destination: network_path.remote,
2033 size,
2034 ecn: None,
2035 segment_size: None,
2036 src_ip: network_path.local_ip,
2037 })
2038 }
2039
2040 fn send_nat_traversal_path_challenge(
2045 &mut self,
2046 now: Instant,
2047 buf: &mut Vec<u8>,
2048 path_id: PathId,
2049 ) -> Option<Transmit> {
2050 let remote = self
2051 .n0_nat_traversal
2052 .server_side_mut()
2053 .ok()?
2054 .next_probe_addr()?;
2055
2056 if !self.paths.get(&path_id)?.data.validated {
2057 return None;
2059 }
2060
2061 let Some(cid) = self
2066 .remote_cids
2067 .get(&path_id)
2068 .map(|cid_queue| cid_queue.active())
2069 else {
2070 trace!(%path_id, "Not sending NAT traversal probe for path with no CIDs");
2071 return None;
2072 };
2073 let token = self.rng.random();
2074
2075 let frame = frame::PathChallenge(token);
2076
2077 let mut buf = TransmitBuf::new(buf, NonZeroUsize::MIN, MIN_INITIAL_SIZE.into());
2078 buf.start_new_datagram();
2079
2080 let mut builder = PacketBuilder::new(now, SpaceId::Data, path_id, cid, &mut buf, self)?;
2081 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
2082 builder.write_frame_with_log_msg(frame, stats, Some("(nat-traversal)"));
2083 builder.pad_to(MIN_INITIAL_SIZE);
2086 builder.finish(self, now);
2087
2088 if let Ok(server_state) = self.n0_nat_traversal.server_side_mut() {
2090 server_state.mark_probe_sent((remote.ip(), remote.port()));
2091 }
2092
2093 let path = &mut self.paths.get_mut(&path_id).expect("checked").data;
2094 let network_path = FourTuple {
2095 remote,
2096 local_ip: None,
2097 };
2098
2099 path.record_path_challenge_sent(now, token, network_path);
2100
2101 if let Ok(server_state) = self.n0_nat_traversal.server_side_mut()
2103 && server_state.has_pending_retries()
2104 {
2105 let initial_pto = RttEstimator::new(self.config.initial_rtt).pto_base();
2106 self.timers.set(
2107 Timer::Conn(ConnTimer::NatTraversalProbeRetry),
2108 now + initial_pto,
2109 self.qlog.with_time(now),
2110 );
2111 }
2112
2113 let size = buf.len();
2114
2115 self.path_stats.for_path(path_id).udp_tx.on_sent(1, size);
2116
2117 Some(Transmit {
2118 destination: remote,
2119 size,
2120 ecn: None,
2121 segment_size: None,
2122 src_ip: None,
2123 })
2124 }
2125
2126 fn space_can_send(
2134 &mut self,
2135 space_id: SpaceId,
2136 path_id: PathId,
2137 packet_size: usize,
2138 connection_close_pending: bool,
2139 ) -> SendableFrames {
2140 let space = &mut self.spaces[space_id];
2141 let space_has_crypto = self.crypto_state.has_keys(space_id.encryption_level());
2142
2143 if !space_has_crypto
2144 && (space_id != SpaceId::Data
2145 || !self.crypto_state.has_keys(EncryptionLevel::ZeroRtt)
2146 || self.side.is_server())
2147 {
2148 return SendableFrames::empty();
2150 }
2151
2152 let mut can_send = space.can_send(path_id, &self.streams);
2153
2154 if space_id == SpaceId::Data {
2156 let pn = space.for_path(path_id).peek_tx_number();
2157 let frame_space_1rtt =
2163 packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
2164 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
2165 }
2166
2167 can_send.close = connection_close_pending && space_has_crypto;
2168
2169 can_send
2170 }
2171
2172 pub fn handle_event(&mut self, event: ConnectionEvent) {
2178 use ConnectionEventInner::*;
2179 match event.0 {
2180 Datagram(DatagramConnectionEvent {
2181 now,
2182 network_path,
2183 path_id,
2184 ecn,
2185 first_decode,
2186 remaining,
2187 }) => {
2188 let span = trace_span!("pkt", %path_id);
2189 let _guard = span.enter();
2190
2191 if self.update_network_path_or_discard(network_path, path_id) {
2192 return;
2194 }
2195
2196 let was_anti_amplification_blocked = self
2197 .path(path_id)
2198 .map(|path| path.anti_amplification_blocked(1))
2199 .unwrap_or(false);
2202
2203 let rx = &mut self.path_stats.for_path(path_id).udp_rx;
2204 rx.datagrams += 1;
2205 rx.bytes += first_decode.len() as u64;
2206 let data_len = first_decode.len();
2207
2208 self.handle_decode(now, network_path, path_id, ecn, first_decode);
2209 if let Some(path) = self.path_mut(path_id) {
2214 path.inc_total_recvd(data_len as u64);
2215 }
2216
2217 if let Some(data) = remaining {
2218 self.path_stats.for_path(path_id).udp_rx.bytes += data.len() as u64;
2219 self.handle_coalesced(now, network_path, path_id, ecn, data);
2220 }
2221
2222 if let Some(path) = self.paths.get_mut(&path_id) {
2223 self.qlog
2224 .emit_recovery_metrics(path_id, &mut path.data, now);
2225 }
2226
2227 if was_anti_amplification_blocked {
2228 self.set_loss_detection_timer(now, path_id);
2232 }
2233 }
2234 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
2235 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
2236 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
2237 let cid_state = self
2238 .local_cid_state
2239 .entry(path_id)
2240 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
2241 cid_state.new_cids(&ids, now);
2242
2243 ids.into_iter().rev().for_each(|frame| {
2244 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
2245 });
2246 self.reset_cid_retirement(now);
2248 }
2249 }
2250 }
2251
2252 fn update_network_path_or_discard(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
2257 let remote_may_migrate = self.side.remote_may_migrate(&self.state);
2258 let local_ip_may_migrate = self.side.is_client();
2259 if let Some(known_path) = self.path_mut(path_id) {
2263 if network_path.remote != known_path.network_path.remote && !remote_may_migrate {
2264 trace!(
2265 %path_id,
2266 %network_path,
2267 %known_path.network_path,
2268 "discarding packet from unrecognized peer"
2269 );
2270 return true;
2271 }
2272
2273 if known_path.network_path.local_ip.is_some()
2274 && network_path.local_ip.is_some()
2275 && known_path.network_path.local_ip != network_path.local_ip
2276 && !local_ip_may_migrate
2277 {
2278 trace!(
2279 %path_id,
2280 %network_path,
2281 %known_path.network_path,
2282 "discarding packet sent to incorrect interface"
2283 );
2284 return true;
2285 }
2286 if let Some(local_ip) = network_path.local_ip {
2291 if known_path
2292 .network_path
2293 .local_ip
2294 .is_some_and(|ip| ip != local_ip)
2295 {
2296 debug!(
2297 %path_id,
2298 %network_path,
2299 %known_path.network_path,
2300 "path's local address seemingly migrated"
2301 );
2302 }
2303 known_path.network_path.local_ip = Some(local_ip);
2310 }
2311 }
2312 false
2313 }
2314
2315 pub fn handle_timeout(&mut self, now: Instant) {
2325 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
2326 trace!(?timer, at=?now, "timeout");
2328 match timer {
2329 Timer::Conn(timer) => match timer {
2330 ConnTimer::Close => {
2331 self.state.move_to_drained(None);
2332 self.endpoint_events.push_back(EndpointEventInner::Drained);
2335 }
2336 ConnTimer::Idle => {
2337 self.kill(ConnectionError::TimedOut);
2338 }
2339 ConnTimer::KeepAlive => {
2340 trace!("sending keep-alive");
2341 self.ping();
2342 }
2343 ConnTimer::KeyDiscard => {
2344 self.crypto_state.discard_temporary_keys();
2345 }
2346 ConnTimer::PushNewCid => {
2347 while let Some((path_id, when)) = self.next_cid_retirement() {
2348 if when > now {
2349 break;
2350 }
2351 match self.local_cid_state.get_mut(&path_id) {
2352 None => error!(%path_id, "No local CID state for path"),
2353 Some(cid_state) => {
2354 let num_new_cid = cid_state.on_cid_timeout().into();
2356 if !self.state.is_closed() {
2357 trace!(
2358 "push a new CID to peer RETIRE_PRIOR_TO field {}",
2359 cid_state.retire_prior_to()
2360 );
2361 self.endpoint_events.push_back(
2362 EndpointEventInner::NeedIdentifiers(
2363 path_id,
2364 now,
2365 num_new_cid,
2366 ),
2367 );
2368 }
2369 }
2370 }
2371 }
2372 }
2373 ConnTimer::NoAvailablePath => {
2374 if self.state.is_closed() || self.state.is_drained() {
2379 error!("no viable path timer fired, but connection already closing");
2382 } else {
2383 trace!("no viable path grace period expired, closing connection");
2384 let err = TransportError::NO_VIABLE_PATH(
2385 "last path abandoned, no new path opened",
2386 );
2387 self.close_common();
2388 self.set_close_timer(now);
2389 self.connection_close_pending = true;
2390 self.state.move_to_closed(err);
2391 }
2392 }
2393 ConnTimer::NatTraversalProbeRetry => {
2394 if let Ok(server_state) = self.n0_nat_traversal.server_side_mut()
2395 && server_state.queue_retries()
2396 {
2397 trace!("off-path probe retry timer fired, re-queued probes");
2398 }
2399 }
2400 },
2401 Timer::PerPath(path_id, timer) => {
2402 let span = trace_span!("per-path timer fired", %path_id, ?timer);
2403 let _guard = span.enter();
2404 match timer {
2405 PathTimer::PathIdle => {
2406 if let Err(err) =
2407 self.close_path_inner(now, path_id, PathAbandonReason::TimedOut)
2408 {
2409 warn!(?err, "failed closing path");
2410 }
2411 }
2412
2413 PathTimer::PathKeepAlive => {
2414 trace!("sending keep-alive on path");
2415 self.ping_path(path_id).ok();
2416 }
2417 PathTimer::LossDetection => {
2418 self.on_loss_detection_timeout(now, path_id);
2419 self.qlog.emit_recovery_metrics(
2420 path_id,
2421 &mut self.paths.get_mut(&path_id).unwrap().data,
2422 now,
2423 );
2424 }
2425 PathTimer::PathValidationFailed => {
2426 let Some(path) = self.paths.get_mut(&path_id) else {
2427 continue;
2428 };
2429 self.timers.stop(
2430 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2431 self.qlog.with_time(now),
2432 );
2433 debug!("path migration validation failed");
2434 if let Some((_, prev)) = path.prev.take() {
2435 path.data = prev;
2436 }
2437 path.data.reset_on_path_challenges();
2438 }
2439 PathTimer::PathChallengeLost => {
2440 let Some(path) = self.paths.get_mut(&path_id) else {
2441 continue;
2442 };
2443 trace!("path challenge deemed lost");
2444 path.data.pending_on_path_challenge = true;
2445 }
2446 PathTimer::AbandonFromValidation => {
2447 let Some(path) = self.paths.get_mut(&path_id) else {
2448 continue;
2449 };
2450 path.data.reset_on_path_challenges();
2451 self.timers.stop(
2452 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2453 self.qlog.with_time(now),
2454 );
2455 debug!("new path validation failed");
2456 if let Err(err) = self.close_path_inner(
2457 now,
2458 path_id,
2459 PathAbandonReason::ValidationFailed,
2460 ) {
2461 warn!(?err, "failed closing path");
2462 }
2463 }
2464 PathTimer::Pacing => trace!("pacing timer expired"),
2465 PathTimer::MaxAckDelay => {
2466 trace!("max ack delay reached");
2467 self.spaces[SpaceId::Data]
2469 .for_path(path_id)
2470 .pending_acks
2471 .on_max_ack_delay_timeout()
2472 }
2473 PathTimer::PathDrained => {
2474 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2477 if let Some(local_cid_state) = self.local_cid_state.remove(&path_id) {
2478 debug_assert!(!self.state.is_drained()); let (min_seq, max_seq) = local_cid_state.active_seq();
2480 for seq in min_seq..=max_seq {
2481 self.endpoint_events.push_back(
2482 EndpointEventInner::RetireConnectionId(
2483 now, path_id, seq, false,
2484 ),
2485 );
2486 }
2487 }
2488 self.discard_path(path_id, now);
2489 }
2490 }
2491 }
2492 }
2493 }
2494 }
2495
2496 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2508 self.close_inner(
2509 now,
2510 Close::Application(frame::ApplicationClose { error_code, reason }),
2511 )
2512 }
2513
2514 fn close_inner(&mut self, now: Instant, reason: Close) {
2530 let was_closed = self.state.is_closed();
2531 if !was_closed {
2532 self.close_common();
2533 self.set_close_timer(now);
2534 self.connection_close_pending = true;
2535 self.state.move_to_closed_local(reason);
2536 }
2537 }
2538
2539 pub fn datagrams(&mut self) -> Datagrams<'_> {
2541 Datagrams { conn: self }
2542 }
2543
2544 pub fn stats(&mut self) -> ConnectionStats {
2546 let mut stats = self.partial_stats.clone();
2547
2548 for path_stats in self.path_stats.iter_stats() {
2549 stats += *path_stats;
2554 }
2555
2556 stats
2557 }
2558
2559 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2561 let path = self.paths.get(&path_id)?;
2562 let stats = self.path_stats.for_path(path_id);
2563 stats.rtt = path.data.rtt.get();
2564 stats.cwnd = path.data.congestion.window();
2565 stats.current_mtu = path.data.mtud.current_mtu();
2566 Some(*stats)
2567 }
2568
2569 pub fn ping(&mut self) {
2573 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2576 path_data.ping_pending = true;
2577 }
2578 }
2579
2580 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2584 let path_data = self.spaces[self.highest_space]
2585 .number_spaces
2586 .get_mut(&path)
2587 .ok_or(ClosedPath { _private: () })?;
2588 path_data.ping_pending = true;
2589 Ok(())
2590 }
2591
2592 pub fn force_key_update(&mut self) {
2596 if !self.state.is_established() {
2597 debug!("ignoring forced key update in illegal state");
2598 return;
2599 }
2600 if self.crypto_state.prev_crypto.is_some() {
2601 debug!("ignoring redundant forced key update");
2604 return;
2605 }
2606 self.crypto_state.update_keys(None, false);
2607 }
2608
2609 pub fn crypto_session(&self) -> &dyn crypto::Session {
2611 self.crypto_state.session.as_ref()
2612 }
2613
2614 pub fn is_handshaking(&self) -> bool {
2624 self.state.is_handshake()
2625 }
2626
2627 pub fn is_closed(&self) -> bool {
2638 self.state.is_closed()
2639 }
2640
2641 pub fn is_drained(&self) -> bool {
2646 self.state.is_drained()
2647 }
2648
2649 pub fn accepted_0rtt(&self) -> bool {
2653 self.crypto_state.accepted_0rtt
2654 }
2655
2656 pub fn has_0rtt(&self) -> bool {
2658 self.crypto_state.zero_rtt_enabled
2659 }
2660
2661 pub fn has_pending_retransmits(&self) -> bool {
2663 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2664 }
2665
2666 pub fn side(&self) -> Side {
2668 self.side.side()
2669 }
2670
2671 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2673 self.path(path_id)
2674 .map(|path_data| {
2675 path_data
2676 .last_observed_addr_report
2677 .as_ref()
2678 .map(|observed| observed.socket_addr())
2679 })
2680 .ok_or(ClosedPath { _private: () })
2681 }
2682
2683 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2685 self.path(path_id).map(|d| d.rtt.get())
2686 }
2687
2688 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2690 self.path(path_id).map(|d| d.congestion.as_ref())
2691 }
2692
2693 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2698 self.streams.set_max_concurrent(dir, count);
2699 let pending = &mut self.spaces[SpaceId::Data].pending;
2702 self.streams.queue_max_stream_id(pending);
2703 }
2704
2705 pub fn set_max_concurrent_paths(
2715 &mut self,
2716 now: Instant,
2717 count: NonZeroU32,
2718 ) -> Result<(), MultipathNotNegotiated> {
2719 if !self.is_multipath_negotiated() {
2720 return Err(MultipathNotNegotiated { _private: () });
2721 }
2722 self.max_concurrent_paths = count;
2723
2724 let in_use_count = self
2725 .local_max_path_id
2726 .next()
2727 .saturating_sub(self.abandoned_paths.len() as u32)
2728 .as_u32();
2729 let extra_needed = count.get().saturating_sub(in_use_count);
2730 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2731
2732 self.set_max_path_id(now, new_max_path_id);
2733
2734 Ok(())
2735 }
2736
2737 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2739 if max_path_id <= self.local_max_path_id {
2740 return;
2741 }
2742
2743 self.local_max_path_id = max_path_id;
2744 self.spaces[SpaceId::Data].pending.max_path_id = true;
2745
2746 self.issue_first_path_cids(now);
2747 }
2748
2749 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2755 self.streams.max_concurrent(dir)
2756 }
2757
2758 pub fn set_send_window(&mut self, send_window: u64) {
2760 self.streams.set_send_window(send_window);
2761 }
2762
2763 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2765 if self.streams.set_receive_window(receive_window) {
2766 self.spaces[SpaceId::Data].pending.max_data = true;
2767 }
2768 }
2769
2770 pub fn is_multipath_negotiated(&self) -> bool {
2775 !self.is_handshaking()
2776 && self.config.max_concurrent_multipath_paths.is_some()
2777 && self.peer_params.initial_max_path_id.is_some()
2778 }
2779
2780 fn on_ack_received(
2781 &mut self,
2782 now: Instant,
2783 space: SpaceId,
2784 ack: frame::Ack,
2785 ) -> Result<(), TransportError> {
2786 let path = PathId::ZERO;
2788 self.inner_on_ack_received(now, space, path, ack)
2789 }
2790
2791 fn on_path_ack_received(
2792 &mut self,
2793 now: Instant,
2794 space: SpaceId,
2795 path_ack: frame::PathAck,
2796 ) -> Result<(), TransportError> {
2797 let (ack, path) = path_ack.into_ack();
2798 self.inner_on_ack_received(now, space, path, ack)
2799 }
2800
2801 fn inner_on_ack_received(
2803 &mut self,
2804 now: Instant,
2805 space: SpaceId,
2806 path: PathId,
2807 ack: frame::Ack,
2808 ) -> Result<(), TransportError> {
2809 if !self.spaces[space].number_spaces.contains_key(&path) {
2810 if self.abandoned_paths.contains(&path) {
2811 trace!("silently ignoring PATH_ACK on discarded path");
2817 return Ok(());
2818 } else {
2819 return Err(TransportError::PROTOCOL_VIOLATION(
2820 "received PATH_ACK with path ID never used",
2821 ));
2822 }
2823 }
2824 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2825 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2826 }
2827 let new_largest = {
2828 let space = &mut self.spaces[space].for_path(path);
2829 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2830 space.largest_acked_packet = Some(ack.largest);
2831 if let Some(info) = space.sent_packets.get(ack.largest) {
2832 space.largest_acked_packet_sent = info.time_sent;
2836 }
2837 true
2838 } else {
2839 false
2840 }
2841 };
2842
2843 if self.detect_spurious_loss(&ack, space, path) {
2844 self.path_stats.for_path(path).spurious_congestion_events += 1;
2845 self.path_data_mut(path)
2846 .congestion
2847 .on_spurious_congestion_event();
2848 }
2849
2850 let mut newly_acked = ArrayRangeSet::new();
2852 for range in ack.iter() {
2853 self.spaces[space].for_path(path).check_ack(range.clone())?;
2854 for (pn, _) in self.spaces[space]
2855 .for_path(path)
2856 .sent_packets
2857 .iter_range(range)
2858 {
2859 newly_acked.insert_one(pn);
2860 }
2861 }
2862
2863 if newly_acked.is_empty() {
2864 return Ok(());
2865 }
2866
2867 let mut ack_eliciting_acked = false;
2868 for packet in newly_acked.elts() {
2869 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2870 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2871 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2877 pns.pending_acks.subtract_below(*acked_pn);
2878 }
2879 }
2880 ack_eliciting_acked |= info.ack_eliciting;
2881
2882 let path_data = self.path_data_mut(path);
2884 let mtu_updated = path_data.mtud.on_acked(space.kind(), packet, info.size);
2885 if mtu_updated {
2886 path_data
2887 .congestion
2888 .on_mtu_update(path_data.mtud.current_mtu());
2889 }
2890
2891 self.ack_frequency.on_acked(path, packet);
2893
2894 self.on_packet_acked(now, path, info);
2895 }
2896 }
2897
2898 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2899 let path_data = self.path_data_mut(path);
2900 let app_limited = path_data.app_limited;
2901 let in_flight = path_data.in_flight.bytes;
2902
2903 path_data
2904 .congestion
2905 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2906
2907 if new_largest && ack_eliciting_acked {
2908 let ack_delay = if space != SpaceId::Data {
2909 Duration::from_micros(0)
2910 } else {
2911 cmp::min(
2912 self.ack_frequency.peer_max_ack_delay,
2913 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2914 )
2915 };
2916 let rtt = now.saturating_duration_since(
2917 self.spaces[space].for_path(path).largest_acked_packet_sent,
2918 );
2919
2920 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2921 let path_data = self.path_data_mut(path);
2922 path_data.rtt.update(ack_delay, rtt);
2924 if path_data.first_packet_after_rtt_sample.is_none() {
2925 path_data.first_packet_after_rtt_sample = Some((space.kind(), next_pn));
2926 }
2927 }
2928
2929 self.detect_lost_packets(now, space, path, true);
2931
2932 if self.peer_completed_handshake_address_validation() {
2937 self.path_data_mut(path).pto_count = 0;
2938 }
2939
2940 if self.path_data(path).sending_ecn {
2945 if let Some(ecn) = ack.ecn {
2946 if new_largest {
2951 let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2952 self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2953 }
2954 } else {
2955 debug!("ECN not acknowledged by peer");
2957 self.path_data_mut(path).sending_ecn = false;
2958 }
2959 }
2960
2961 self.set_loss_detection_timer(now, path);
2962 Ok(())
2963 }
2964
2965 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2966 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2967
2968 if lost_packets.is_empty() {
2969 return false;
2970 }
2971
2972 for range in ack.iter() {
2973 let spurious_losses: Vec<u64> = lost_packets
2974 .iter_range(range.clone())
2975 .map(|(pn, _info)| pn)
2976 .collect();
2977
2978 for pn in spurious_losses {
2979 lost_packets.remove(pn);
2980 }
2981 }
2982
2983 lost_packets.is_empty()
2988 }
2989
2990 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2995 let two_pto = 2 * self.path_data(path).rtt.pto_base();
2996
2997 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2998 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2999 }
3000
3001 fn process_ecn(
3003 &mut self,
3004 now: Instant,
3005 space: SpaceId,
3006 path: PathId,
3007 newly_acked: u64,
3008 ecn: frame::EcnCounts,
3009 largest_sent_time: Instant,
3010 ) {
3011 match self.spaces[space]
3012 .for_path(path)
3013 .detect_ecn(newly_acked, ecn)
3014 {
3015 Err(e) => {
3016 debug!("halting ECN due to verification failure: {}", e);
3017
3018 self.path_data_mut(path).sending_ecn = false;
3019 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
3022 }
3023 Ok(false) => {}
3024 Ok(true) => {
3025 self.path_stats.for_path(path).congestion_events += 1;
3026 self.path_data_mut(path).congestion.on_congestion_event(
3027 now,
3028 largest_sent_time,
3029 false,
3030 true,
3031 0,
3032 );
3033 }
3034 }
3035 }
3036
3037 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
3040 let path = self.path_data_mut(path_id);
3041 let app_limited = path.app_limited;
3042 path.remove_in_flight(&info);
3043 if info.ack_eliciting && info.path_generation == path.generation() {
3044 let rtt = path.rtt;
3048 path.congestion
3049 .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
3050 }
3051
3052 if let Some(retransmits) = info.retransmits.get() {
3054 for (id, _) in retransmits.reset_stream.iter() {
3055 self.streams.reset_acked(*id);
3056 }
3057 }
3058
3059 for frame in info.stream_frames {
3060 self.streams.received_ack_of(frame);
3061 }
3062 }
3063
3064 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceKind) {
3065 let start = if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) {
3066 now
3067 } else {
3068 self.crypto_state
3069 .prev_crypto
3070 .as_ref()
3071 .expect("no previous keys")
3072 .end_packet
3073 .as_ref()
3074 .expect("update not acknowledged yet")
3075 .1
3076 };
3077
3078 self.timers.set(
3080 Timer::Conn(ConnTimer::KeyDiscard),
3081 start + self.max_pto_for_space(space) * 3,
3082 self.qlog.with_time(now),
3083 );
3084 }
3085
3086 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
3099 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
3100 self.detect_lost_packets(now, pn_space, path_id, false);
3102 self.set_loss_detection_timer(now, path_id);
3103 return;
3104 }
3105
3106 let Some((_, space)) = self.pto_time_and_space(now, path_id) else {
3107 error!(%path_id, "PTO expired while unset");
3108 return;
3109 };
3110 trace!(
3111 in_flight = self.path_data(path_id).in_flight.bytes,
3112 count = self.path_data(path_id).pto_count,
3113 ?space,
3114 %path_id,
3115 "PTO fired"
3116 );
3117
3118 let count = match self.path_data(path_id).in_flight.ack_eliciting {
3119 0 => {
3122 debug_assert!(!self.peer_completed_handshake_address_validation());
3123 1
3124 }
3125 _ => 2,
3127 };
3128 let pns = self.spaces[space].for_path(path_id);
3129 pns.loss_probes = pns.loss_probes.saturating_add(count);
3130 let path_data = self.path_data_mut(path_id);
3131 path_data.pto_count = path_data.pto_count.saturating_add(1);
3132 self.set_loss_detection_timer(now, path_id);
3133 }
3134
3135 fn detect_lost_packets(
3152 &mut self,
3153 now: Instant,
3154 pn_space: SpaceId,
3155 path_id: PathId,
3156 due_to_ack: bool,
3157 ) {
3158 let mut lost_packets = Vec::<u64>::new();
3159 let mut lost_mtu_probe = None;
3160 let mut in_persistent_congestion = false;
3161 let mut size_of_lost_packets = 0u64;
3162 self.spaces[pn_space].for_path(path_id).loss_time = None;
3163
3164 let path = self.path_data(path_id);
3167 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3168 let loss_delay = path
3169 .rtt
3170 .conservative()
3171 .mul_f32(self.config.time_threshold)
3172 .max(TIMER_GRANULARITY);
3173 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
3174
3175 let largest_acked_packet = self.spaces[pn_space]
3176 .for_path(path_id)
3177 .largest_acked_packet
3178 .expect("detect_lost_packets only to be called if path received at least one ACK");
3179 let packet_threshold = self.config.packet_threshold as u64;
3180
3181 let congestion_period = self
3185 .pto(SpaceKind::Data, path_id)
3186 .saturating_mul(self.config.persistent_congestion_threshold);
3187 let mut persistent_congestion_start: Option<Instant> = None;
3188 let mut prev_packet = None;
3189 let space = self.spaces[pn_space].for_path(path_id);
3190
3191 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
3192 if prev_packet != Some(packet.wrapping_sub(1)) {
3193 persistent_congestion_start = None;
3195 }
3196
3197 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
3201 if packet_too_old || largest_acked_packet >= packet + packet_threshold {
3202 if Some(packet) == in_flight_mtu_probe {
3204 lost_mtu_probe = in_flight_mtu_probe;
3207 } else {
3208 lost_packets.push(packet);
3209 size_of_lost_packets += info.size as u64;
3210 if info.ack_eliciting && due_to_ack {
3211 match persistent_congestion_start {
3212 Some(start) if info.time_sent - start > congestion_period => {
3215 in_persistent_congestion = true;
3216 }
3217 None if first_packet_after_rtt_sample
3219 .is_some_and(|x| x < (pn_space.kind(), packet)) =>
3220 {
3221 persistent_congestion_start = Some(info.time_sent);
3222 }
3223 _ => {}
3224 }
3225 }
3226 }
3227 } else {
3228 if space.loss_time.is_none() {
3230 space.loss_time = Some(info.time_sent + loss_delay);
3233 }
3234 persistent_congestion_start = None;
3235 }
3236
3237 prev_packet = Some(packet);
3238 }
3239
3240 self.handle_lost_packets(
3241 pn_space,
3242 path_id,
3243 now,
3244 lost_packets,
3245 lost_mtu_probe,
3246 loss_delay,
3247 in_persistent_congestion,
3248 size_of_lost_packets,
3249 );
3250 }
3251
3252 fn discard_path(&mut self, path_id: PathId, now: Instant) {
3254 trace!(%path_id, "dropping path state");
3255 let path = self.path_data(path_id);
3256 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
3257
3258 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
3260 .for_path(path_id)
3261 .sent_packets
3262 .iter()
3263 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
3264 .map(|(pn, info)| {
3265 size_of_lost_packets += info.size as u64;
3266 pn
3267 })
3268 .collect();
3269
3270 if !lost_pns.is_empty() {
3271 trace!(
3272 %path_id,
3273 count = lost_pns.len(),
3274 lost_bytes = size_of_lost_packets,
3275 "packets lost on path abandon"
3276 );
3277 self.handle_lost_packets(
3278 SpaceId::Data,
3279 path_id,
3280 now,
3281 lost_pns,
3282 in_flight_mtu_probe,
3283 Duration::ZERO,
3284 false,
3285 size_of_lost_packets,
3286 );
3287 }
3288 let path_stats = self.path_stats.discard(&path_id);
3291 self.partial_stats += path_stats;
3292 self.paths.remove(&path_id);
3293 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
3294
3295 self.events.push_back(
3296 PathEvent::Discarded {
3297 id: path_id,
3298 path_stats: Box::new(path_stats),
3299 }
3300 .into(),
3301 );
3302 }
3303
3304 fn handle_lost_packets(
3305 &mut self,
3306 pn_space: SpaceId,
3307 path_id: PathId,
3308 now: Instant,
3309 lost_packets: Vec<u64>,
3310 lost_mtu_probe: Option<u64>,
3311 loss_delay: Duration,
3312 in_persistent_congestion: bool,
3313 size_of_lost_packets: u64,
3314 ) {
3315 debug_assert!(lost_packets.is_sorted(), "lost_packets must be sorted");
3316
3317 self.drain_lost_packets(now, pn_space, path_id);
3318
3319 if let Some(largest_lost) = lost_packets.last().cloned() {
3321 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
3322 let largest_lost_sent = self.spaces[pn_space]
3323 .for_path(path_id)
3324 .sent_packets
3325 .get(largest_lost)
3326 .unwrap()
3327 .time_sent;
3328 let path_stats = self.path_stats.for_path(path_id);
3329 path_stats.lost_packets += lost_packets.len() as u64;
3330 path_stats.lost_bytes += size_of_lost_packets;
3331 trace!(
3332 %path_id,
3333 count = lost_packets.len(),
3334 lost_bytes = size_of_lost_packets,
3335 "packets lost",
3336 );
3337
3338 for &packet in &lost_packets {
3339 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
3340 continue;
3341 };
3342 self.qlog
3343 .emit_packet_lost(packet, &info, loss_delay, pn_space.kind(), now);
3344 self.paths
3345 .get_mut(&path_id)
3346 .unwrap()
3347 .remove_in_flight(&info);
3348
3349 for frame in info.stream_frames {
3350 self.streams.retransmit(frame);
3351 }
3352 self.spaces[pn_space].pending |= info.retransmits;
3353 self.path_data_mut(path_id)
3354 .mtud
3355 .on_non_probe_lost(packet, info.size);
3356
3357 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
3358 packet,
3359 LostPacket {
3360 time_sent: info.time_sent,
3361 },
3362 );
3363 }
3364
3365 let path = self.path_data_mut(path_id);
3366 if path.mtud.black_hole_detected(now) {
3367 path.congestion.on_mtu_update(path.mtud.current_mtu());
3368 if let Some(max_datagram_size) = self.datagrams().max_size()
3369 && self.datagrams.drop_oversized(max_datagram_size)
3370 && self.datagrams.send_blocked
3371 {
3372 self.datagrams.send_blocked = false;
3373 self.events.push_back(Event::DatagramsUnblocked);
3374 }
3375 self.path_stats.for_path(path_id).black_holes_detected += 1;
3376 }
3377
3378 let lost_ack_eliciting =
3380 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
3381
3382 if lost_ack_eliciting {
3383 self.path_stats.for_path(path_id).congestion_events += 1;
3384 self.path_data_mut(path_id).congestion.on_congestion_event(
3385 now,
3386 largest_lost_sent,
3387 in_persistent_congestion,
3388 false,
3389 size_of_lost_packets,
3390 );
3391 }
3392 }
3393
3394 if let Some(packet) = lost_mtu_probe {
3396 let info = self.spaces[SpaceId::Data]
3397 .for_path(path_id)
3398 .take(packet)
3399 .unwrap(); self.paths
3402 .get_mut(&path_id)
3403 .unwrap()
3404 .remove_in_flight(&info);
3405 self.path_data_mut(path_id).mtud.on_probe_lost();
3406 self.path_stats.for_path(path_id).lost_plpmtud_probes += 1;
3407 }
3408 }
3409
3410 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
3416 SpaceId::iter()
3417 .filter_map(|id| {
3418 self.spaces[id]
3419 .number_spaces
3420 .get(&path_id)
3421 .and_then(|pns| pns.loss_time)
3422 .map(|time| (time, id))
3423 })
3424 .min_by_key(|&(time, _)| time)
3425 }
3426
3427 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
3435 let path = self.path(path_id)?;
3436 let pto_count = path.pto_count;
3437
3438 let max_interval = if path.rtt.get() > SLOW_RTT_THRESHOLD {
3440 (path.rtt.get() * 3) / 2
3442 } else if let Some(idle) = path.idle_timeout.or(self.idle_timeout)
3443 && idle <= MIN_IDLE_FOR_FAST_PTO
3444 {
3445 MAX_PTO_FAST_INTERVAL
3448 } else {
3449 MAX_PTO_INTERVAL
3451 };
3452
3453 if path_id == PathId::ZERO
3454 && path.in_flight.ack_eliciting == 0
3455 && !self.peer_completed_handshake_address_validation()
3456 {
3457 let space = match self.highest_space {
3463 SpaceKind::Handshake => SpaceId::Handshake,
3464 _ => SpaceId::Initial,
3465 };
3466
3467 let backoff = 2u32.pow(path.pto_count.min(MAX_BACKOFF_EXPONENT));
3468 let duration = path.rtt.pto_base() * backoff;
3469 let duration = duration.min(max_interval);
3470 return Some((now + duration, space));
3471 }
3472
3473 let mut result = None;
3474 for space in SpaceId::iter() {
3475 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
3476 continue;
3477 };
3478
3479 if space == SpaceId::Data && !self.is_handshake_confirmed() {
3480 continue;
3484 }
3485
3486 if !pns.has_in_flight() {
3487 continue;
3488 }
3489
3490 let duration = {
3495 let max_ack_delay = if space == SpaceId::Data {
3496 self.ack_frequency.max_ack_delay_for_pto()
3497 } else {
3498 Duration::ZERO
3499 };
3500 let pto_base = path.rtt.pto_base() + max_ack_delay;
3501 let mut duration = pto_base;
3502 for i in 1..=pto_count {
3503 let exponential_duration = pto_base * 2u32.pow(i.min(MAX_BACKOFF_EXPONENT));
3504 let max_duration = duration + max_interval;
3505 duration = exponential_duration.min(max_duration);
3506 }
3507 duration
3508 };
3509
3510 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3511 continue;
3512 };
3513 let pto = last_ack_eliciting + duration;
3516 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3517 if path.anti_amplification_blocked(1) {
3518 continue;
3520 }
3521 if path.in_flight.ack_eliciting == 0 {
3522 continue;
3524 }
3525 result = Some((pto, space));
3526 }
3527 }
3528 result
3529 }
3530
3531 fn peer_completed_handshake_address_validation(&self) -> bool {
3533 if self.side.is_server() || self.state.is_closed() {
3534 return true;
3535 }
3536 self.spaces[SpaceId::Handshake]
3540 .path_space(PathId::ZERO)
3541 .and_then(|pns| pns.largest_acked_packet)
3542 .is_some()
3543 || self.spaces[SpaceId::Data]
3544 .path_space(PathId::ZERO)
3545 .and_then(|pns| pns.largest_acked_packet)
3546 .is_some()
3547 || (self.crypto_state.has_keys(EncryptionLevel::OneRtt)
3548 && !self.crypto_state.has_keys(EncryptionLevel::Handshake))
3549 }
3550
3551 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3559 if self.state.is_closed() {
3560 return;
3564 }
3565
3566 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3567 self.timers.set(
3569 Timer::PerPath(path_id, PathTimer::LossDetection),
3570 loss_time,
3571 self.qlog.with_time(now),
3572 );
3573 return;
3574 }
3575
3576 if !self.abandoned_paths.contains(&path_id)
3579 && let Some((timeout, _)) = self.pto_time_and_space(now, path_id)
3580 {
3581 self.timers.set(
3582 Timer::PerPath(path_id, PathTimer::LossDetection),
3583 timeout,
3584 self.qlog.with_time(now),
3585 );
3586 } else {
3587 self.timers.stop(
3588 Timer::PerPath(path_id, PathTimer::LossDetection),
3589 self.qlog.with_time(now),
3590 );
3591 }
3592 }
3593
3594 fn max_pto_for_space(&self, space: SpaceKind) -> Duration {
3598 self.paths
3599 .keys()
3600 .map(|path_id| self.pto(space, *path_id))
3601 .max()
3602 .unwrap_or_else(|| {
3603 let rtt = self.config.initial_rtt;
3607 let max_ack_delay = match space {
3608 SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3609 SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3610 };
3611 rtt + cmp::max(4 * (rtt / 2), TIMER_GRANULARITY) + max_ack_delay
3612 })
3613 }
3614
3615 fn pto(&self, space: SpaceKind, path_id: PathId) -> Duration {
3620 let max_ack_delay = match space {
3621 SpaceKind::Initial | SpaceKind::Handshake => Duration::ZERO,
3622 SpaceKind::Data => self.ack_frequency.max_ack_delay_for_pto(),
3623 };
3624 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3625 }
3626
3627 fn on_packet_authenticated(
3628 &mut self,
3629 now: Instant,
3630 space_id: SpaceKind,
3631 path_id: PathId,
3632 ecn: Option<EcnCodepoint>,
3633 packet_number: Option<u64>,
3634 spin: bool,
3635 is_1rtt: bool,
3636 remote: &FourTuple,
3637 ) {
3638 let is_on_path = *remote == self.path_data(path_id).network_path;
3645
3646 self.total_authed_packets += 1;
3647 self.reset_keep_alive(path_id, now);
3648 self.reset_idle_timeout(now, space_id, path_id);
3649 self.path_data_mut(path_id).permit_idle_reset = true;
3650
3651 if is_on_path {
3654 self.receiving_ecn |= ecn.is_some();
3655 if let Some(x) = ecn {
3656 let space = &mut self.spaces[space_id];
3657 space.for_path(path_id).ecn_counters += x;
3658
3659 if x.is_ce() {
3660 space
3661 .for_path(path_id)
3662 .pending_acks
3663 .set_immediate_ack_required();
3664 }
3665 }
3666 }
3667
3668 let Some(packet_number) = packet_number else {
3669 return;
3670 };
3671 match &self.side {
3672 ConnectionSide::Client { .. } => {
3673 if space_id == SpaceKind::Handshake
3677 && let Some(hs) = self.state.as_handshake_mut()
3678 {
3679 hs.allow_server_migration = false;
3680 }
3681 }
3682 ConnectionSide::Server { .. } => {
3683 if self.crypto_state.has_keys(EncryptionLevel::Initial)
3684 && space_id == SpaceKind::Handshake
3685 {
3686 self.discard_space(now, SpaceKind::Initial);
3688 }
3689 if self.crypto_state.has_keys(EncryptionLevel::ZeroRtt) && is_1rtt {
3690 self.set_key_discard_timer(now, space_id)
3692 }
3693 }
3694 }
3695 let space = self.spaces[space_id].for_path(path_id);
3696
3697 space.pending_acks.insert_one(packet_number, now);
3698 if packet_number >= space.largest_received_packet_number.unwrap_or_default() {
3699 space.largest_received_packet_number = Some(packet_number);
3700
3701 if is_on_path {
3703 self.spin = self.side.is_client() ^ spin;
3704 }
3705 }
3706 }
3707
3708 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceKind, path_id: PathId) {
3713 if let Some(timeout) = self.idle_timeout {
3715 if self.state.is_closed() {
3716 self.timers
3717 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3718 } else {
3719 let dt = cmp::max(timeout, 3 * self.max_pto_for_space(space));
3720 self.timers.set(
3721 Timer::Conn(ConnTimer::Idle),
3722 now + dt,
3723 self.qlog.with_time(now),
3724 );
3725 }
3726 }
3727
3728 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3730 if self.state.is_closed() {
3731 self.timers.stop(
3732 Timer::PerPath(path_id, PathTimer::PathIdle),
3733 self.qlog.with_time(now),
3734 );
3735 } else {
3736 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3737 self.timers.set(
3738 Timer::PerPath(path_id, PathTimer::PathIdle),
3739 now + dt,
3740 self.qlog.with_time(now),
3741 );
3742 }
3743 }
3744 }
3745
3746 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3748 if !self.state.is_established() {
3749 return;
3750 }
3751
3752 if let Some(interval) = self.config.keep_alive_interval {
3753 self.timers.set(
3754 Timer::Conn(ConnTimer::KeepAlive),
3755 now + interval,
3756 self.qlog.with_time(now),
3757 );
3758 }
3759
3760 if let Some(interval) = self.path_data(path_id).keep_alive {
3761 self.timers.set(
3762 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3763 now + interval,
3764 self.qlog.with_time(now),
3765 );
3766 }
3767 }
3768
3769 fn reset_cid_retirement(&mut self, now: Instant) {
3771 if let Some((_path, t)) = self.next_cid_retirement() {
3772 self.timers.set(
3773 Timer::Conn(ConnTimer::PushNewCid),
3774 t,
3775 self.qlog.with_time(now),
3776 );
3777 }
3778 }
3779
3780 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3782 self.local_cid_state
3783 .iter()
3784 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3785 .min_by_key(|(_path_id, timeout)| *timeout)
3786 }
3787
3788 pub(crate) fn handle_first_packet(
3793 &mut self,
3794 now: Instant,
3795 network_path: FourTuple,
3796 ecn: Option<EcnCodepoint>,
3797 packet_number: u64,
3798 packet: InitialPacket,
3799 remaining: Option<BytesMut>,
3800 ) -> Result<(), ConnectionError> {
3801 let span = trace_span!("first recv");
3802 let _guard = span.enter();
3803 debug_assert!(self.side.is_server());
3804 let len = packet.header_data.len() + packet.payload.len();
3805 let path_id = PathId::ZERO;
3806 self.path_data_mut(path_id).total_recvd = len as u64;
3807
3808 if let Some(hs) = self.state.as_handshake_mut() {
3809 hs.expected_token = packet.header.token.clone();
3810 } else {
3811 unreachable!("first packet must be delivered in Handshake state");
3812 }
3813
3814 self.on_packet_authenticated(
3816 now,
3817 SpaceKind::Initial,
3818 path_id,
3819 ecn,
3820 Some(packet_number),
3821 false,
3822 false,
3823 &network_path,
3824 );
3825
3826 let packet: Packet = packet.into();
3827
3828 let mut qlog = QlogRecvPacket::new(len);
3829 qlog.header(&packet.header, Some(packet_number), path_id);
3830
3831 self.process_decrypted_packet(
3832 now,
3833 network_path,
3834 path_id,
3835 Some(packet_number),
3836 packet,
3837 &mut qlog,
3838 )?;
3839 self.qlog.emit_packet_received(qlog, now);
3840 if let Some(data) = remaining {
3841 self.handle_coalesced(now, network_path, path_id, ecn, data);
3842 }
3843
3844 self.qlog.emit_recovery_metrics(
3845 path_id,
3846 &mut self.paths.get_mut(&path_id).unwrap().data,
3847 now,
3848 );
3849
3850 Ok(())
3851 }
3852
3853 fn init_0rtt(&mut self, now: Instant) {
3854 let Some((header, packet)) = self.crypto_state.session.early_crypto() else {
3855 return;
3856 };
3857 if self.side.is_client() {
3858 match self.crypto_state.session.transport_parameters() {
3859 Ok(params) => {
3860 let params = params
3861 .expect("crypto layer didn't supply transport parameters with ticket");
3862 let params = TransportParameters {
3864 initial_src_cid: None,
3865 original_dst_cid: None,
3866 preferred_address: None,
3867 retry_src_cid: None,
3868 stateless_reset_token: None,
3869 min_ack_delay: None,
3870 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3871 max_ack_delay: TransportParameters::default().max_ack_delay,
3872 initial_max_path_id: None,
3873 ..params
3874 };
3875 self.set_peer_params(params);
3876 self.qlog.emit_peer_transport_params_restored(self, now);
3877 }
3878 Err(e) => {
3879 error!("session ticket has malformed transport parameters: {}", e);
3880 return;
3881 }
3882 }
3883 }
3884 trace!("0-RTT enabled");
3885 self.crypto_state.enable_zero_rtt(header, packet);
3886 }
3887
3888 fn read_crypto(
3889 &mut self,
3890 space: SpaceId,
3891 crypto: &frame::Crypto,
3892 payload_len: usize,
3893 ) -> Result<(), TransportError> {
3894 let expected = if !self.state.is_handshake() {
3895 SpaceId::Data
3896 } else if self.highest_space == SpaceKind::Initial {
3897 SpaceId::Initial
3898 } else {
3899 SpaceId::Handshake
3902 };
3903 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3907
3908 let end = crypto.offset + crypto.data.len() as u64;
3909 if space < expected
3910 && end
3911 > self.crypto_state.spaces[space.kind()]
3912 .crypto_stream
3913 .bytes_read()
3914 {
3915 warn!(
3916 "received new {:?} CRYPTO data when expecting {:?}",
3917 space, expected
3918 );
3919 return Err(TransportError::PROTOCOL_VIOLATION(
3920 "new data at unexpected encryption level",
3921 ));
3922 }
3923
3924 let crypto_space = &mut self.crypto_state.spaces[space.kind()];
3925 let max = end.saturating_sub(crypto_space.crypto_stream.bytes_read());
3926 if max > self.config.crypto_buffer_size as u64 {
3927 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3928 }
3929
3930 crypto_space
3931 .crypto_stream
3932 .insert(crypto.offset, crypto.data.clone(), payload_len);
3933 while let Some(chunk) = crypto_space.crypto_stream.read(usize::MAX, true) {
3934 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3935 if self.crypto_state.session.read_handshake(&chunk.bytes)? {
3936 self.events.push_back(Event::HandshakeDataReady);
3937 }
3938 }
3939
3940 Ok(())
3941 }
3942
3943 fn write_crypto(&mut self) {
3944 loop {
3945 let space = self.highest_space;
3946 let mut outgoing = Vec::new();
3947 if let Some(crypto) = self.crypto_state.session.write_handshake(&mut outgoing) {
3948 match space {
3949 SpaceKind::Initial => {
3950 self.upgrade_crypto(SpaceKind::Handshake, crypto);
3951 }
3952 SpaceKind::Handshake => {
3953 self.upgrade_crypto(SpaceKind::Data, crypto);
3954 }
3955 SpaceKind::Data => unreachable!("got updated secrets during 1-RTT"),
3956 }
3957 }
3958 if outgoing.is_empty() {
3959 if space == self.highest_space {
3960 break;
3961 } else {
3962 continue;
3964 }
3965 }
3966 let offset = self.crypto_state.spaces[space].crypto_offset;
3967 let outgoing = Bytes::from(outgoing);
3968 if let Some(hs) = self.state.as_handshake_mut()
3969 && space == SpaceKind::Initial
3970 && offset == 0
3971 && self.side.is_client()
3972 {
3973 hs.client_hello = Some(outgoing.clone());
3974 }
3975 self.crypto_state.spaces[space].crypto_offset += outgoing.len() as u64;
3976 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3977 self.spaces[space].pending.crypto.push_back(frame::Crypto {
3978 offset,
3979 data: outgoing,
3980 });
3981 }
3982 }
3983
3984 fn upgrade_crypto(&mut self, space: SpaceKind, crypto: Keys) {
3986 debug_assert!(
3987 !self.crypto_state.has_keys(space.encryption_level()),
3988 "already reached packet space {space:?}"
3989 );
3990 trace!("{:?} keys ready", space);
3991 if space == SpaceKind::Data {
3992 self.crypto_state.next_crypto = Some(
3994 self.crypto_state
3995 .session
3996 .next_1rtt_keys()
3997 .expect("handshake should be complete"),
3998 );
3999 }
4000
4001 self.crypto_state.spaces[space].keys = Some(crypto);
4002 debug_assert!(space > self.highest_space);
4003 self.highest_space = space;
4004 if space == SpaceKind::Data && self.side.is_client() {
4005 self.crypto_state.discard_zero_rtt();
4007 }
4008 }
4009
4010 fn discard_space(&mut self, now: Instant, space: SpaceKind) {
4011 debug_assert!(space != SpaceKind::Data);
4012 trace!("discarding {:?} keys", space);
4013 if space == SpaceKind::Initial {
4014 if let ConnectionSide::Client { token, .. } = &mut self.side {
4016 *token = Bytes::new();
4017 }
4018 }
4019 self.crypto_state.spaces[space].keys = None;
4020 let space = &mut self.spaces[space];
4021 let pns = space.for_path(PathId::ZERO);
4022 pns.time_of_last_ack_eliciting_packet = None;
4023 pns.loss_time = None;
4024 pns.loss_probes = 0;
4025 let sent_packets = mem::take(&mut pns.sent_packets);
4026 let path = self.paths.get_mut(&PathId::ZERO).unwrap();
4027 for (_, packet) in sent_packets.into_iter() {
4028 path.data.remove_in_flight(&packet);
4029 }
4030
4031 self.set_loss_detection_timer(now, PathId::ZERO)
4032 }
4033
4034 fn handle_coalesced(
4035 &mut self,
4036 now: Instant,
4037 network_path: FourTuple,
4038 path_id: PathId,
4039 ecn: Option<EcnCodepoint>,
4040 data: BytesMut,
4041 ) {
4042 self.path_data_mut(path_id)
4043 .inc_total_recvd(data.len() as u64);
4044 let mut remaining = Some(data);
4045 let cid_len = self
4046 .local_cid_state
4047 .values()
4048 .map(|cid_state| cid_state.cid_len())
4049 .next()
4050 .expect("one cid_state must exist");
4051 while let Some(data) = remaining {
4052 match PartialDecode::new(
4053 data,
4054 &FixedLengthConnectionIdParser::new(cid_len),
4055 &[self.version],
4056 self.endpoint_config.grease_quic_bit,
4057 ) {
4058 Ok((partial_decode, rest)) => {
4059 remaining = rest;
4060 self.handle_decode(now, network_path, path_id, ecn, partial_decode);
4061 }
4062 Err(e) => {
4063 trace!("malformed header: {}", e);
4064 return;
4065 }
4066 }
4067 }
4068 }
4069
4070 fn handle_decode(
4071 &mut self,
4072 now: Instant,
4073 network_path: FourTuple,
4074 path_id: PathId,
4075 ecn: Option<EcnCodepoint>,
4076 partial_decode: PartialDecode,
4077 ) {
4078 let qlog = QlogRecvPacket::new(partial_decode.len());
4079 if let Some(decoded) = self
4080 .crypto_state
4081 .unprotect_header(partial_decode, self.peer_params.stateless_reset_token)
4082 {
4083 self.handle_packet(
4084 now,
4085 network_path,
4086 path_id,
4087 ecn,
4088 decoded.packet,
4089 decoded.stateless_reset,
4090 qlog,
4091 );
4092 }
4093 }
4094
4095 fn handle_packet(
4096 &mut self,
4097 now: Instant,
4098 network_path: FourTuple,
4099 path_id: PathId,
4100 ecn: Option<EcnCodepoint>,
4101 packet: Option<Packet>,
4102 stateless_reset: bool,
4103 mut qlog: QlogRecvPacket,
4104 ) {
4105 self.path_stats.for_path(path_id).udp_rx.ios += 1;
4106
4107 if let Some(ref packet) = packet {
4108 trace!(
4109 "got {:?} packet ({} bytes) from {} using id {}",
4110 packet.header.space(),
4111 packet.payload.len() + packet.header_data.len(),
4112 network_path,
4113 packet.header.dst_cid(),
4114 );
4115 }
4116
4117 if self.is_handshaking() {
4118 if path_id != PathId::ZERO {
4119 debug!(%network_path, %path_id, "discarding multipath packet during handshake");
4120 return;
4121 }
4122 if network_path != self.path_data_mut(path_id).network_path {
4123 if let Some(hs) = self.state.as_handshake() {
4124 if hs.allow_server_migration {
4125 trace!(%network_path, prev = %self.path_data(path_id).network_path, "server migrated to new remote");
4126 self.path_data_mut(path_id).network_path = network_path;
4127 self.qlog.emit_tuple_assigned(path_id, network_path, now);
4128 } else {
4129 debug!("discarding packet with unexpected remote during handshake");
4130 return;
4131 }
4132 } else {
4133 debug!("discarding packet with unexpected remote during handshake");
4134 return;
4135 }
4136 }
4137 }
4138
4139 let was_closed = self.state.is_closed();
4140 let was_drained = self.state.is_drained();
4141
4142 let decrypted = match packet {
4143 None => Err(None),
4144 Some(mut packet) => self
4145 .decrypt_packet(now, path_id, &mut packet)
4146 .map(move |number| (packet, number)),
4147 };
4148 let result = match decrypted {
4149 _ if stateless_reset => {
4150 debug!("got stateless reset");
4151 Err(ConnectionError::Reset)
4152 }
4153 Err(Some(e)) => {
4154 warn!("illegal packet: {}", e);
4155 Err(e.into())
4156 }
4157 Err(None) => {
4158 debug!("failed to authenticate packet");
4159 self.authentication_failures += 1;
4160 let integrity_limit = self
4161 .crypto_state
4162 .integrity_limit(self.highest_space)
4163 .unwrap();
4164 if self.authentication_failures > integrity_limit {
4165 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
4166 } else {
4167 return;
4168 }
4169 }
4170 Ok((packet, number)) => {
4171 qlog.header(&packet.header, number, path_id);
4172 let span = match number {
4173 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
4174 None => trace_span!("recv", space = ?packet.header.space()),
4175 };
4176 let _guard = span.enter();
4177
4178 let dedup = self.spaces[packet.header.space()]
4179 .path_space_mut(path_id)
4180 .map(|pns| &mut pns.dedup);
4181 if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
4182 debug!("discarding possible duplicate packet");
4183 self.qlog.emit_packet_received(qlog, now);
4184 return;
4185 } else if self.state.is_handshake() && packet.header.is_short() {
4186 trace!("dropping short packet during handshake");
4188 self.qlog.emit_packet_received(qlog, now);
4189 return;
4190 } else {
4191 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header
4192 && let Some(hs) = self.state.as_handshake()
4193 && self.side.is_server()
4194 && token != &hs.expected_token
4195 {
4196 warn!("discarding Initial with invalid retry token");
4200 self.qlog.emit_packet_received(qlog, now);
4201 return;
4202 }
4203
4204 if !self.state.is_closed() {
4205 let spin = match packet.header {
4206 Header::Short { spin, .. } => spin,
4207 _ => false,
4208 };
4209
4210 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
4211 self.ensure_path(path_id, network_path, now, number);
4213 }
4214 if self.paths.contains_key(&path_id) {
4215 self.on_packet_authenticated(
4216 now,
4217 packet.header.space(),
4218 path_id,
4219 ecn,
4220 number,
4221 spin,
4222 packet.header.is_1rtt(),
4223 &network_path,
4224 );
4225 }
4226 }
4227
4228 let res = self.process_decrypted_packet(
4229 now,
4230 network_path,
4231 path_id,
4232 number,
4233 packet,
4234 &mut qlog,
4235 );
4236
4237 self.qlog.emit_packet_received(qlog, now);
4238 res
4239 }
4240 }
4241 };
4242
4243 if let Err(conn_err) = result {
4245 match conn_err {
4246 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
4247 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
4248 ConnectionError::Reset
4249 | ConnectionError::TransportError(TransportError {
4250 code: TransportErrorCode::AEAD_LIMIT_REACHED,
4251 ..
4252 }) => {
4253 self.state.move_to_drained(Some(conn_err));
4254 }
4255 ConnectionError::TimedOut => {
4256 unreachable!("timeouts aren't generated by packet processing");
4257 }
4258 ConnectionError::TransportError(err) => {
4259 debug!("closing connection due to transport error: {}", err);
4260 self.state.move_to_closed(err);
4261 }
4262 ConnectionError::VersionMismatch => {
4263 self.state.move_to_draining(Some(conn_err));
4264 }
4265 ConnectionError::LocallyClosed => {
4266 unreachable!("LocallyClosed isn't generated by packet processing");
4267 }
4268 ConnectionError::CidsExhausted => {
4269 unreachable!("CidsExhausted isn't generated by packet processing");
4270 }
4271 };
4272 }
4273
4274 if !was_closed && self.state.is_closed() {
4275 self.close_common();
4276 if !self.state.is_drained() {
4277 self.set_close_timer(now);
4278 }
4279 }
4280 if !was_drained && self.state.is_drained() {
4281 self.endpoint_events.push_back(EndpointEventInner::Drained);
4282 self.timers
4285 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
4286 }
4287
4288 if matches!(self.state.as_type(), StateType::Closed) {
4295 if self
4313 .paths
4314 .get(&path_id)
4315 .map(|p| p.data.validated && p.data.network_path == network_path)
4316 .unwrap_or(false)
4317 {
4318 self.connection_close_pending = true;
4319 }
4320 }
4321 }
4322
4323 fn process_decrypted_packet(
4324 &mut self,
4325 now: Instant,
4326 network_path: FourTuple,
4327 path_id: PathId,
4328 number: Option<u64>,
4329 packet: Packet,
4330 qlog: &mut QlogRecvPacket,
4331 ) -> Result<(), ConnectionError> {
4332 if !self.paths.contains_key(&path_id) {
4333 trace!(%path_id, ?number, "discarding packet for unknown path");
4337 return Ok(());
4338 }
4339 let state = match self.state.as_type() {
4340 StateType::Established => {
4341 match packet.header.space() {
4342 SpaceKind::Data => self.process_payload(
4343 now,
4344 network_path,
4345 path_id,
4346 number.unwrap(),
4347 packet,
4348 qlog,
4349 )?,
4350 _ if packet.header.has_frames() => {
4351 self.process_early_payload(now, path_id, packet, qlog)?
4352 }
4353 _ => {
4354 trace!("discarding unexpected pre-handshake packet");
4355 }
4356 }
4357 return Ok(());
4358 }
4359 StateType::Closed => {
4360 for result in frame::Iter::new(packet.payload.freeze())? {
4361 let frame = match result {
4362 Ok(frame) => frame,
4363 Err(err) => {
4364 debug!("frame decoding error: {err:?}");
4365 continue;
4366 }
4367 };
4368 qlog.frame(&frame);
4369
4370 if let Frame::Padding = frame {
4371 continue;
4372 };
4373
4374 self.path_stats
4375 .for_path(path_id)
4376 .frame_rx
4377 .record(frame.ty());
4378
4379 if let Frame::Close(_error) = frame {
4380 self.state.move_to_draining(None);
4381 break;
4382 }
4383 }
4384 return Ok(());
4385 }
4386 StateType::Draining | StateType::Drained => return Ok(()),
4387 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
4388 };
4389
4390 match packet.header {
4391 Header::Retry {
4392 src_cid: remote_cid,
4393 ..
4394 } => {
4395 debug_assert_eq!(path_id, PathId::ZERO);
4396 if self.side.is_server() {
4397 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
4398 }
4399
4400 let is_valid_retry = self
4401 .remote_cids
4402 .get(&path_id)
4403 .map(|cids| cids.active())
4404 .map(|orig_dst_cid| {
4405 self.crypto_state.session.is_valid_retry(
4406 orig_dst_cid,
4407 &packet.header_data,
4408 &packet.payload,
4409 )
4410 })
4411 .unwrap_or_default();
4412 if self.total_authed_packets > 1
4413 || packet.payload.len() <= 16 || !is_valid_retry
4415 {
4416 trace!("discarding invalid Retry");
4417 return Ok(());
4425 }
4426
4427 trace!("retrying with CID {}", remote_cid);
4428 let client_hello = state.client_hello.take().unwrap();
4429 self.retry_src_cid = Some(remote_cid);
4430 self.remote_cids
4431 .get_mut(&path_id)
4432 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
4433 .update_initial_cid(remote_cid);
4434 self.remote_handshake_cid = remote_cid;
4435
4436 let space = &mut self.spaces[SpaceId::Initial];
4437 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
4438 self.on_packet_acked(now, PathId::ZERO, info);
4439 };
4440
4441 self.discard_space(now, SpaceKind::Initial); let crypto_space = &mut self.crypto_state.spaces[SpaceKind::Initial];
4444 crypto_space.keys = Some(
4445 self.crypto_state
4446 .session
4447 .initial_keys(remote_cid, self.side.side()),
4448 );
4449 crypto_space.crypto_offset = client_hello.len() as u64;
4450
4451 let next_pn = self.spaces[SpaceId::Initial]
4452 .for_path(path_id)
4453 .next_packet_number;
4454 self.spaces[SpaceId::Initial] = {
4455 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
4456 space.for_path(path_id).next_packet_number = next_pn;
4457 space.pending.crypto.push_back(frame::Crypto {
4458 offset: 0,
4459 data: client_hello,
4460 });
4461 space
4462 };
4463
4464 let zero_rtt = mem::take(
4466 &mut self.spaces[SpaceId::Data]
4467 .for_path(PathId::ZERO)
4468 .sent_packets,
4469 );
4470 for (_, info) in zero_rtt.into_iter() {
4471 self.paths
4472 .get_mut(&PathId::ZERO)
4473 .unwrap()
4474 .remove_in_flight(&info);
4475 self.spaces[SpaceId::Data].pending |= info.retransmits;
4476 }
4477 self.streams.retransmit_all_for_0rtt();
4478
4479 let token_len = packet.payload.len() - 16;
4480 let ConnectionSide::Client { ref mut token, .. } = self.side else {
4481 unreachable!("we already short-circuited if we're server");
4482 };
4483 *token = packet.payload.freeze().split_to(token_len);
4484
4485 self.state = State::handshake(state::Handshake {
4486 expected_token: Bytes::new(),
4487 remote_cid_set: false,
4488 client_hello: None,
4489 allow_server_migration: true,
4490 });
4491 Ok(())
4492 }
4493 Header::Long {
4494 ty: LongType::Handshake,
4495 src_cid: remote_cid,
4496 dst_cid: local_cid,
4497 ..
4498 } => {
4499 debug_assert_eq!(path_id, PathId::ZERO);
4500 if remote_cid != self.remote_handshake_cid {
4501 debug!(
4502 "discarding packet with mismatched remote CID: {} != {}",
4503 self.remote_handshake_cid, remote_cid
4504 );
4505 return Ok(());
4506 }
4507 self.on_path_validated(path_id);
4508
4509 self.process_early_payload(now, path_id, packet, qlog)?;
4510 if self.state.is_closed() {
4511 return Ok(());
4512 }
4513
4514 if self.crypto_state.session.is_handshaking() {
4515 trace!("handshake ongoing");
4516 return Ok(());
4517 }
4518
4519 if self.side.is_client() {
4520 let params = self
4522 .crypto_state
4523 .session
4524 .transport_parameters()?
4525 .ok_or_else(|| {
4526 TransportError::new(
4527 TransportErrorCode::crypto(0x6d),
4528 "transport parameters missing".to_owned(),
4529 )
4530 })?;
4531
4532 if self.has_0rtt() {
4533 if !self.crypto_state.session.early_data_accepted().unwrap() {
4534 debug_assert!(self.side.is_client());
4535 debug!("0-RTT rejected");
4536 self.crypto_state.accepted_0rtt = false;
4537 self.streams.zero_rtt_rejected();
4538
4539 self.spaces[SpaceId::Data].pending = Retransmits::default();
4541
4542 let sent_packets = mem::take(
4544 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
4545 );
4546 for (_, packet) in sent_packets.into_iter() {
4547 self.paths
4548 .get_mut(&path_id)
4549 .unwrap()
4550 .remove_in_flight(&packet);
4551 }
4552 } else {
4553 self.crypto_state.accepted_0rtt = true;
4554 params.validate_resumption_from(&self.peer_params)?;
4555 }
4556 }
4557 if let Some(token) = params.stateless_reset_token {
4558 let remote = self.path_data(path_id).network_path.remote;
4559 debug_assert!(!self.state.is_drained()); self.endpoint_events
4561 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4562 }
4563 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4564 self.issue_first_cids(now);
4565 } else {
4566 self.spaces[SpaceId::Data].pending.handshake_done = true;
4568 self.discard_space(now, SpaceKind::Handshake);
4569 self.events.push_back(Event::HandshakeConfirmed);
4570 trace!("handshake confirmed");
4571 }
4572
4573 self.events.push_back(Event::Connected);
4574 self.state.move_to_established();
4575 trace!("established");
4576
4577 self.issue_first_path_cids(now);
4580 Ok(())
4581 }
4582 Header::Initial(InitialHeader {
4583 src_cid: remote_cid,
4584 dst_cid: local_cid,
4585 ..
4586 }) => {
4587 debug_assert_eq!(path_id, PathId::ZERO);
4588 if !state.remote_cid_set {
4589 trace!("switching remote CID to {}", remote_cid);
4590 let mut state = state.clone();
4591 self.remote_cids
4592 .get_mut(&path_id)
4593 .expect("PathId::ZERO not yet abandoned")
4594 .update_initial_cid(remote_cid);
4595 self.remote_handshake_cid = remote_cid;
4596 self.original_remote_cid = remote_cid;
4597 state.remote_cid_set = true;
4598 self.state.move_to_handshake(state);
4599 } else if remote_cid != self.remote_handshake_cid {
4600 debug!(
4601 "discarding packet with mismatched remote CID: {} != {}",
4602 self.remote_handshake_cid, remote_cid
4603 );
4604 return Ok(());
4605 }
4606
4607 let starting_space = self.highest_space;
4608 self.process_early_payload(now, path_id, packet, qlog)?;
4609
4610 if self.side.is_server()
4611 && starting_space == SpaceKind::Initial
4612 && self.highest_space != SpaceKind::Initial
4613 {
4614 let params = self
4615 .crypto_state
4616 .session
4617 .transport_parameters()?
4618 .ok_or_else(|| {
4619 TransportError::new(
4620 TransportErrorCode::crypto(0x6d),
4621 "transport parameters missing".to_owned(),
4622 )
4623 })?;
4624 self.handle_peer_params(params, local_cid, remote_cid, now)?;
4625 self.issue_first_cids(now);
4626 self.init_0rtt(now);
4627 }
4628 Ok(())
4629 }
4630 Header::Long {
4631 ty: LongType::ZeroRtt,
4632 ..
4633 } => {
4634 self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4635 Ok(())
4636 }
4637 Header::VersionNegotiate { .. } => {
4638 if self.total_authed_packets > 1 {
4639 return Ok(());
4640 }
4641 let supported = packet
4642 .payload
4643 .chunks(4)
4644 .any(|x| match <[u8; 4]>::try_from(x) {
4645 Ok(version) => self.version == u32::from_be_bytes(version),
4646 Err(_) => false,
4647 });
4648 if supported {
4649 return Ok(());
4650 }
4651 debug!("remote doesn't support our version");
4652 Err(ConnectionError::VersionMismatch)
4653 }
4654 Header::Short { .. } => unreachable!(
4655 "short packets received during handshake are discarded in handle_packet"
4656 ),
4657 }
4658 }
4659
4660 fn process_early_payload(
4662 &mut self,
4663 now: Instant,
4664 path_id: PathId,
4665 packet: Packet,
4666 #[allow(unused)] qlog: &mut QlogRecvPacket,
4667 ) -> Result<(), TransportError> {
4668 debug_assert_ne!(packet.header.space(), SpaceKind::Data);
4669 debug_assert_eq!(path_id, PathId::ZERO);
4670 let payload_len = packet.payload.len();
4671 let mut ack_eliciting = false;
4672 for result in frame::Iter::new(packet.payload.freeze())? {
4673 let frame = result?;
4674 qlog.frame(&frame);
4675 let span = match frame {
4676 Frame::Padding => continue,
4677 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4678 };
4679
4680 self.path_stats
4681 .for_path(path_id)
4682 .frame_rx
4683 .record(frame.ty());
4684
4685 let _guard = span.as_ref().map(|x| x.enter());
4686 ack_eliciting |= frame.is_ack_eliciting();
4687
4688 if frame.is_1rtt() && packet.header.space() != SpaceKind::Data {
4690 return Err(TransportError::PROTOCOL_VIOLATION(
4691 "illegal frame type in handshake",
4692 ));
4693 }
4694
4695 match frame {
4696 Frame::Padding | Frame::Ping => {}
4697 Frame::Crypto(frame) => {
4698 self.read_crypto(packet.header.space().into(), &frame, payload_len)?;
4699 }
4700 Frame::Ack(ack) => {
4701 self.on_ack_received(now, packet.header.space().into(), ack)?;
4702 }
4703 Frame::PathAck(ack) => {
4704 span.as_ref()
4705 .map(|span| span.record("path", tracing::field::display(&ack.path_id)));
4706 self.on_path_ack_received(now, packet.header.space().into(), ack)?;
4707 }
4708 Frame::Close(reason) => {
4709 self.state.move_to_draining(Some(reason.into()));
4710 return Ok(());
4711 }
4712 _ => {
4713 let mut err =
4714 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4715 err.frame = frame::MaybeFrame::Known(frame.ty());
4716 return Err(err);
4717 }
4718 }
4719 }
4720
4721 if ack_eliciting {
4722 self.spaces[packet.header.space()]
4724 .for_path(path_id)
4725 .pending_acks
4726 .set_immediate_ack_required();
4727 }
4728
4729 self.write_crypto();
4730 Ok(())
4731 }
4732
4733 fn process_payload(
4735 &mut self,
4736 now: Instant,
4737 network_path: FourTuple,
4738 path_id: PathId,
4739 number: u64,
4740 packet: Packet,
4741 #[allow(unused)] qlog: &mut QlogRecvPacket,
4742 ) -> Result<(), TransportError> {
4743 let is_multipath_negotiated = self.is_multipath_negotiated();
4744 let payload = packet.payload.freeze();
4745 let mut is_probing_packet = true;
4746 let mut close = None;
4747 let payload_len = payload.len();
4748 let mut ack_eliciting = false;
4749 let mut migration_observed_addr = None;
4752 for result in frame::Iter::new(payload)? {
4753 let frame = result?;
4754 qlog.frame(&frame);
4755 let span = match frame {
4756 Frame::Padding => continue,
4757 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4758 };
4759
4760 self.path_stats
4761 .for_path(path_id)
4762 .frame_rx
4763 .record(frame.ty());
4764 match &frame {
4767 Frame::Crypto(f) => {
4768 trace!(offset = f.offset, len = f.data.len(), "got frame CRYPTO");
4769 }
4770 Frame::Stream(f) => {
4771 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got frame STREAM");
4772 }
4773 Frame::Datagram(f) => {
4774 trace!(len = f.data.len(), "got frame DATAGRAM");
4775 }
4776 f => {
4777 trace!("got frame {f}");
4778 }
4779 }
4780
4781 let _guard = span.enter();
4782 if packet.header.is_0rtt() {
4783 match frame {
4784 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4785 return Err(TransportError::PROTOCOL_VIOLATION(
4786 "illegal frame type in 0-RTT",
4787 ));
4788 }
4789 _ => {
4790 if frame.is_1rtt() {
4791 return Err(TransportError::PROTOCOL_VIOLATION(
4792 "illegal frame type in 0-RTT",
4793 ));
4794 }
4795 }
4796 }
4797 }
4798 ack_eliciting |= frame.is_ack_eliciting();
4799
4800 match frame {
4802 Frame::Padding
4803 | Frame::PathChallenge(_)
4804 | Frame::PathResponse(_)
4805 | Frame::NewConnectionId(_)
4806 | Frame::ObservedAddr(_) => {}
4807 _ => {
4808 is_probing_packet = false;
4809 }
4810 }
4811
4812 match frame {
4813 Frame::Crypto(frame) => {
4814 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4815 }
4816 Frame::Stream(frame) => {
4817 if self.streams.received(frame, payload_len)?.should_transmit() {
4818 self.spaces[SpaceId::Data].pending.max_data = true;
4819 }
4820 }
4821 Frame::Ack(ack) => {
4822 self.on_ack_received(now, SpaceId::Data, ack)?;
4823 }
4824 Frame::PathAck(ack) => {
4825 if !self.is_multipath_negotiated() {
4826 return Err(TransportError::PROTOCOL_VIOLATION(
4827 "received PATH_ACK frame when multipath was not negotiated",
4828 ));
4829 }
4830 span.record("path", tracing::field::display(&ack.path_id));
4831 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4832 }
4833 Frame::Padding | Frame::Ping => {}
4834 Frame::Close(reason) => {
4835 close = Some(reason);
4836 }
4837 Frame::PathChallenge(challenge) => {
4838 let path = &mut self
4839 .path_mut(path_id)
4840 .expect("payload is processed only after the path becomes known");
4841 path.path_responses.push(number, challenge.0, network_path);
4842 if network_path == path.network_path {
4845 match self.peer_supports_ack_frequency() {
4855 true => self.immediate_ack(path_id),
4856 false => {
4857 self.ping_path(path_id).ok();
4858 }
4859 }
4860 }
4861 }
4862 Frame::PathResponse(response) => {
4863 let path = self
4864 .paths
4865 .get_mut(&path_id)
4866 .expect("payload is processed only after the path becomes known");
4867
4868 use PathTimer::*;
4869 use paths::OnPathResponseReceived::*;
4870 match path
4871 .data
4872 .on_path_response_received(now, response.0, network_path)
4873 {
4874 OnPath { was_open } => {
4875 let qlog = self.qlog.with_time(now);
4876
4877 self.timers
4878 .stop(Timer::PerPath(path_id, PathValidationFailed), qlog.clone());
4879 self.timers
4880 .stop(Timer::PerPath(path_id, AbandonFromValidation), qlog.clone());
4881
4882 let next_challenge = path
4883 .data
4884 .earliest_on_path_expiring_challenge()
4885 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4886 self.timers.set_or_stop(
4887 Timer::PerPath(path_id, PathChallengeLost),
4888 next_challenge,
4889 qlog,
4890 );
4891
4892 if !was_open {
4893 if is_multipath_negotiated {
4894 self.events
4895 .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4896 }
4897 if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4898 {
4899 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4900 id: path_id,
4901 addr: observed.socket_addr(),
4902 }));
4903 }
4904 }
4905 if let Some((_, ref mut prev)) = path.prev {
4906 prev.reset_on_path_challenges();
4907 }
4908 }
4909 OffPath => {
4910 debug!(%response, "Valid response to off-path PATH_CHALLENGE");
4911 }
4912 Ignored {
4913 sent_on,
4914 current_path,
4915 } => {
4916 debug!(%sent_on, %current_path, %response, "ignoring valid PATH_RESPONSE")
4917 }
4918 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4919 }
4920 }
4921 Frame::MaxData(frame::MaxData(bytes)) => {
4922 self.streams.received_max_data(bytes);
4923 }
4924 Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
4925 self.streams.received_max_stream_data(id, offset)?;
4926 }
4927 Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
4928 self.streams.received_max_streams(dir, count)?;
4929 }
4930 Frame::ResetStream(frame) => {
4931 if self.streams.received_reset(frame)?.should_transmit() {
4932 self.spaces[SpaceId::Data].pending.max_data = true;
4933 }
4934 }
4935 Frame::DataBlocked(DataBlocked(offset)) => {
4936 debug!(offset, "peer claims to be blocked at connection level");
4937 }
4938 Frame::StreamDataBlocked(StreamDataBlocked { id, offset }) => {
4939 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4940 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4941 return Err(TransportError::STREAM_STATE_ERROR(
4942 "STREAM_DATA_BLOCKED on send-only stream",
4943 ));
4944 }
4945 debug!(
4946 stream = %id,
4947 offset, "peer claims to be blocked at stream level"
4948 );
4949 }
4950 Frame::StreamsBlocked(StreamsBlocked { dir, limit }) => {
4951 if limit > MAX_STREAM_COUNT {
4952 return Err(TransportError::FRAME_ENCODING_ERROR(
4953 "unrepresentable stream limit",
4954 ));
4955 }
4956 debug!(
4957 "peer claims to be blocked opening more than {} {} streams",
4958 limit, dir
4959 );
4960 }
4961 Frame::StopSending(frame::StopSending { id, error_code }) => {
4962 if id.initiator() != self.side.side() {
4963 if id.dir() == Dir::Uni {
4964 debug!("got STOP_SENDING on recv-only {}", id);
4965 return Err(TransportError::STREAM_STATE_ERROR(
4966 "STOP_SENDING on recv-only stream",
4967 ));
4968 }
4969 } else if self.streams.is_local_unopened(id) {
4970 return Err(TransportError::STREAM_STATE_ERROR(
4971 "STOP_SENDING on unopened stream",
4972 ));
4973 }
4974 self.streams.received_stop_sending(id, error_code);
4975 }
4976 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4977 if let Some(ref path_id) = path_id {
4978 span.record("path", tracing::field::display(&path_id));
4979 }
4980 let path_id = path_id.unwrap_or_default();
4981 match self.local_cid_state.get_mut(&path_id) {
4982 None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4983 Some(cid_state) => {
4984 let allow_more_cids = cid_state
4985 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4986
4987 let has_path = !self.abandoned_paths.contains(&path_id);
4991 let allow_more_cids = allow_more_cids && has_path;
4992
4993 debug_assert!(!self.state.is_drained()); self.endpoint_events
4995 .push_back(EndpointEventInner::RetireConnectionId(
4996 now,
4997 path_id,
4998 sequence,
4999 allow_more_cids,
5000 ));
5001 }
5002 }
5003 }
5004 Frame::NewConnectionId(frame) => {
5005 let path_id = if let Some(path_id) = frame.path_id {
5006 if !self.is_multipath_negotiated() {
5007 return Err(TransportError::PROTOCOL_VIOLATION(
5008 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
5009 ));
5010 }
5011 if path_id > self.local_max_path_id {
5012 return Err(TransportError::PROTOCOL_VIOLATION(
5013 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
5014 ));
5015 }
5016 path_id
5017 } else {
5018 PathId::ZERO
5019 };
5020
5021 if let Some(ref path_id) = frame.path_id {
5022 span.record("path", tracing::field::display(&path_id));
5023 }
5024
5025 if self.abandoned_paths.contains(&path_id) {
5026 trace!("ignoring issued CID for abandoned path");
5027 continue;
5028 }
5029 let remote_cids = self
5030 .remote_cids
5031 .entry(path_id)
5032 .or_insert_with(|| CidQueue::new(frame.id));
5033 if remote_cids.active().is_empty() {
5034 return Err(TransportError::PROTOCOL_VIOLATION(
5035 "NEW_CONNECTION_ID when CIDs aren't in use",
5036 ));
5037 }
5038 if frame.retire_prior_to > frame.sequence {
5039 return Err(TransportError::PROTOCOL_VIOLATION(
5040 "NEW_CONNECTION_ID retiring unissued CIDs",
5041 ));
5042 }
5043
5044 use crate::cid_queue::InsertError;
5045 match remote_cids.insert(frame) {
5046 Ok(None) if self.path(path_id).is_none() => {
5047 self.continue_nat_traversal_round(now);
5050 }
5051 Ok(None) => {}
5052 Ok(Some((retired, reset_token))) => {
5053 let pending_retired =
5054 &mut self.spaces[SpaceId::Data].pending.retire_cids;
5055 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
5058 if (pending_retired.len() as u64)
5061 .saturating_add(retired.end.saturating_sub(retired.start))
5062 > MAX_PENDING_RETIRED_CIDS
5063 {
5064 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
5065 "queued too many retired CIDs",
5066 ));
5067 }
5068 pending_retired.extend(retired.map(|seq| (path_id, seq)));
5069 self.set_reset_token(path_id, network_path.remote, reset_token);
5070 }
5071 Err(InsertError::ExceedsLimit) => {
5072 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
5073 }
5074 Err(InsertError::Retired) => {
5075 trace!("discarding already-retired");
5076 self.spaces[SpaceId::Data]
5080 .pending
5081 .retire_cids
5082 .push((path_id, frame.sequence));
5083 continue;
5084 }
5085 };
5086
5087 if self.side.is_server()
5088 && path_id == PathId::ZERO
5089 && self
5090 .remote_cids
5091 .get(&PathId::ZERO)
5092 .map(|cids| cids.active_seq() == 0)
5093 .unwrap_or_default()
5094 {
5095 self.update_remote_cid(PathId::ZERO);
5098 }
5099 }
5100 Frame::NewToken(NewToken { token }) => {
5101 let ConnectionSide::Client {
5102 token_store,
5103 server_name,
5104 ..
5105 } = &self.side
5106 else {
5107 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
5108 };
5109 if token.is_empty() {
5110 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
5111 }
5112 trace!("got new token");
5113 token_store.insert(server_name, token);
5114 }
5115 Frame::Datagram(datagram) => {
5116 if self
5117 .datagrams
5118 .received(datagram, &self.config.datagram_receive_buffer_size)?
5119 {
5120 self.events.push_back(Event::DatagramReceived);
5121 }
5122 }
5123 Frame::AckFrequency(ack_frequency) => {
5124 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
5127 continue;
5130 }
5131
5132 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
5134 space.pending_acks.set_ack_frequency_params(&ack_frequency);
5135
5136 if !self.abandoned_paths.contains(path_id)
5140 && let Some(timeout) = space
5141 .pending_acks
5142 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
5143 {
5144 self.timers.set(
5145 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
5146 timeout,
5147 self.qlog.with_time(now),
5148 );
5149 }
5150 }
5151 }
5152 Frame::ImmediateAck => {
5153 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
5155 pns.pending_acks.set_immediate_ack_required();
5156 }
5157 }
5158 Frame::HandshakeDone => {
5159 if self.side.is_server() {
5160 return Err(TransportError::PROTOCOL_VIOLATION(
5161 "client sent HANDSHAKE_DONE",
5162 ));
5163 }
5164 if self.crypto_state.has_keys(EncryptionLevel::Handshake) {
5165 self.discard_space(now, SpaceKind::Handshake);
5166 self.events.push_back(Event::HandshakeConfirmed);
5167 trace!("handshake confirmed");
5168 }
5169 }
5170 Frame::ObservedAddr(observed) => {
5171 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
5173 if !self
5174 .peer_params
5175 .address_discovery_role
5176 .should_report(&self.config.address_discovery_role)
5177 {
5178 return Err(TransportError::PROTOCOL_VIOLATION(
5179 "received OBSERVED_ADDRESS frame when not negotiated",
5180 ));
5181 }
5182 if packet.header.space() != SpaceKind::Data {
5184 return Err(TransportError::PROTOCOL_VIOLATION(
5185 "OBSERVED_ADDRESS frame outside data space",
5186 ));
5187 }
5188
5189 let path = self.path_data_mut(path_id);
5190 if network_path == path.network_path {
5191 if let Some(updated) = path.update_observed_addr_report(observed)
5192 && path.open_status == paths::OpenStatus::Informed
5193 {
5194 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5195 id: path_id,
5196 addr: updated,
5197 }));
5198 }
5200 } else {
5201 migration_observed_addr = Some(observed)
5203 }
5204 }
5205 Frame::PathAbandon(frame::PathAbandon {
5206 path_id,
5207 error_code,
5208 }) => {
5209 span.record("path", tracing::field::display(&path_id));
5210 match self.close_path_inner(
5211 now,
5212 path_id,
5213 PathAbandonReason::RemoteAbandoned {
5214 error_code: error_code.into(),
5215 },
5216 ) {
5217 Ok(()) => {
5218 trace!("peer abandoned path");
5219 }
5220 Err(ClosePathError::ClosedPath) => {
5221 trace!("peer abandoned already closed path");
5222 }
5223 Err(ClosePathError::MultipathNotNegotiated) => {
5224 return Err(TransportError::PROTOCOL_VIOLATION(
5225 "received PATH_ABANDON frame when multipath was not negotiated",
5226 ));
5227 }
5228 Err(ClosePathError::LastOpenPath) => {
5229 error!(
5232 "peer abandoned last path but close_path_inner returned LastOpenPath"
5233 );
5234 }
5235 };
5236
5237 if let Some(path) = self.paths.get_mut(&path_id)
5239 && !mem::replace(&mut path.data.draining, true)
5240 {
5241 let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
5242 let pto = path.data.rtt.pto_base() + ack_delay;
5243 self.timers.set(
5244 Timer::PerPath(path_id, PathTimer::PathDrained),
5245 now + 3 * pto,
5246 self.qlog.with_time(now),
5247 );
5248
5249 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
5250 }
5251 }
5252 Frame::PathStatusAvailable(info) => {
5253 span.record("path", tracing::field::display(&info.path_id));
5254 if self.is_multipath_negotiated() {
5255 self.on_path_status(
5256 info.path_id,
5257 PathStatus::Available,
5258 info.status_seq_no,
5259 );
5260 } else {
5261 return Err(TransportError::PROTOCOL_VIOLATION(
5262 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
5263 ));
5264 }
5265 }
5266 Frame::PathStatusBackup(info) => {
5267 span.record("path", tracing::field::display(&info.path_id));
5268 if self.is_multipath_negotiated() {
5269 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
5270 } else {
5271 return Err(TransportError::PROTOCOL_VIOLATION(
5272 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
5273 ));
5274 }
5275 }
5276 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
5277 span.record("path", tracing::field::display(&path_id));
5278 if !self.is_multipath_negotiated() {
5279 return Err(TransportError::PROTOCOL_VIOLATION(
5280 "received MAX_PATH_ID frame when multipath was not negotiated",
5281 ));
5282 }
5283 if path_id > self.remote_max_path_id {
5285 self.remote_max_path_id = path_id;
5286 self.issue_first_path_cids(now);
5287 while let Some(true) = self.continue_nat_traversal_round(now) {}
5288 }
5289 }
5290 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
5291 if self.is_multipath_negotiated() {
5295 if max_path_id > self.local_max_path_id {
5296 return Err(TransportError::PROTOCOL_VIOLATION(
5297 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
5298 ));
5299 }
5300 debug!("received PATHS_BLOCKED({:?})", max_path_id);
5301 } else {
5303 return Err(TransportError::PROTOCOL_VIOLATION(
5304 "received PATHS_BLOCKED frame when not multipath was not negotiated",
5305 ));
5306 }
5307 }
5308 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
5309 if self.is_multipath_negotiated() {
5317 if path_id > self.local_max_path_id {
5318 return Err(TransportError::PROTOCOL_VIOLATION(
5319 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
5320 ));
5321 }
5322 if next_seq.0
5323 > self
5324 .local_cid_state
5325 .get(&path_id)
5326 .map(|cid_state| cid_state.active_seq().1 + 1)
5327 .unwrap_or_default()
5328 {
5329 return Err(TransportError::PROTOCOL_VIOLATION(
5330 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
5331 ));
5332 }
5333 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
5334 } else {
5335 return Err(TransportError::PROTOCOL_VIOLATION(
5336 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
5337 ));
5338 }
5339 }
5340 Frame::AddAddress(addr) => {
5341 let client_state = match self.n0_nat_traversal.client_side_mut() {
5342 Ok(state) => state,
5343 Err(err) => {
5344 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5345 "Nat traversal(ADD_ADDRESS): {err}"
5346 )));
5347 }
5348 };
5349
5350 if !client_state.check_remote_address(&addr) {
5351 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
5353 }
5354
5355 match client_state.add_remote_address(addr) {
5356 Ok(maybe_added) => {
5357 if let Some(added) = maybe_added {
5358 self.events.push_back(Event::NatTraversal(
5359 n0_nat_traversal::Event::AddressAdded(added),
5360 ));
5361 }
5362 }
5363 Err(e) => {
5364 warn!(%e, "failed to add remote address")
5365 }
5366 }
5367 }
5368 Frame::RemoveAddress(addr) => {
5369 let client_state = match self.n0_nat_traversal.client_side_mut() {
5370 Ok(state) => state,
5371 Err(err) => {
5372 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5373 "Nat traversal(REMOVE_ADDRESS): {err}"
5374 )));
5375 }
5376 };
5377 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
5378 self.events.push_back(Event::NatTraversal(
5379 n0_nat_traversal::Event::AddressRemoved(removed_addr),
5380 ));
5381 }
5382 }
5383 Frame::ReachOut(reach_out) => {
5384 let ipv6 = self.is_ipv6();
5385 let server_state = match self.n0_nat_traversal.server_side_mut() {
5386 Ok(state) => state,
5387 Err(err) => {
5388 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5389 "Nat traversal(REACH_OUT): {err}"
5390 )));
5391 }
5392 };
5393
5394 let round_before = server_state.current_round();
5395
5396 if let Err(err) = server_state.handle_reach_out(reach_out, ipv6) {
5397 return Err(TransportError::PROTOCOL_VIOLATION(format!(
5398 "Nat traversal(REACH_OUT): {err}"
5399 )));
5400 }
5401
5402 let round_advanced = server_state.current_round() > round_before;
5405 if round_advanced {
5406 self.timers.stop(
5407 Timer::Conn(ConnTimer::NatTraversalProbeRetry),
5408 self.qlog.with_time(now),
5409 );
5410
5411 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5412 if path.has_off_path_challenges() {
5413 trace!(
5414 "clearing stale off-path challenges for new NAT traversal round"
5415 );
5416 path.clear_off_path_challenges();
5417
5418 if let Some(remote_cids) = self.remote_cids.get_mut(&path_id)
5419 && let Some((reset_token, retired)) = remote_cids.next()
5420 {
5421 self.spaces[SpaceId::Data]
5422 .pending
5423 .retire_cids
5424 .extend(retired.map(|seq| (path_id, seq)));
5425 let remote = self
5426 .paths
5427 .get(&path_id)
5428 .expect("known path")
5429 .data
5430 .network_path
5431 .remote;
5432 self.set_reset_token(path_id, remote, reset_token);
5433 }
5434 }
5435 }
5436 }
5437 }
5438 }
5439
5440 let space = self.spaces[SpaceId::Data].for_path(path_id);
5441 if space
5442 .pending_acks
5443 .packet_received(now, number, ack_eliciting, &space.dedup)
5444 {
5445 if self.abandoned_paths.contains(&path_id) {
5446 space.pending_acks.set_immediate_ack_required();
5449 } else {
5450 self.timers.set(
5451 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
5452 now + self.ack_frequency.max_ack_delay,
5453 self.qlog.with_time(now),
5454 );
5455 }
5456 }
5457
5458 let pending = &mut self.spaces[SpaceId::Data].pending;
5463 self.streams.queue_max_stream_id(pending);
5464
5465 if let Some(reason) = close {
5466 self.state.move_to_draining(Some(reason.into()));
5467 self.connection_close_pending = true;
5468 }
5469
5470 if Some(number)
5471 == self.spaces[SpaceId::Data]
5472 .for_path(path_id)
5473 .largest_received_packet_number
5474 && !is_probing_packet
5475 && network_path != self.path_data(path_id).network_path
5476 {
5477 let ConnectionSide::Server { ref server_config } = self.side else {
5478 panic!("packets from unknown remote should be dropped by clients");
5479 };
5480 debug_assert!(
5481 server_config.migration,
5482 "migration-initiating packets should have been dropped immediately"
5483 );
5484 self.migrate(path_id, now, network_path, migration_observed_addr);
5485 self.update_remote_cid(path_id);
5487 self.spin = false;
5488 }
5489
5490 Ok(())
5491 }
5492
5493 fn migrate(
5494 &mut self,
5495 path_id: PathId,
5496 now: Instant,
5497 network_path: FourTuple,
5498 observed_addr: Option<ObservedAddr>,
5499 ) {
5500 trace!(%network_path, %path_id, "migration initiated");
5501 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
5502 let prev_pto = self.pto(SpaceKind::Data, path_id);
5509 let path = self.paths.get_mut(&path_id).expect("known path");
5510 let mut new_path_data = if network_path.remote.is_ipv4()
5511 && network_path.remote.ip() == path.data.network_path.remote.ip()
5512 {
5513 PathData::from_previous(network_path, &path.data, self.path_generation_counter, now)
5514 } else {
5515 let peer_max_udp_payload_size =
5516 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
5517 .unwrap_or(u16::MAX);
5518 PathData::new(
5519 network_path,
5520 self.allow_mtud,
5521 Some(peer_max_udp_payload_size),
5522 self.path_generation_counter,
5523 now,
5524 &self.config,
5525 )
5526 };
5527 new_path_data.last_observed_addr_report = path.data.last_observed_addr_report.clone();
5528 if let Some(report) = observed_addr
5529 && let Some(updated) = new_path_data.update_observed_addr_report(report)
5530 {
5531 tracing::info!("adding observed addr event from migration");
5532 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
5533 id: path_id,
5534 addr: updated,
5535 }));
5536 }
5537 new_path_data.pending_on_path_challenge = true;
5538
5539 let mut prev_path_data = mem::replace(&mut path.data, new_path_data);
5540
5541 if !prev_path_data.validated
5550 && let Some(cid) = self.remote_cids.get(&path_id).map(CidQueue::active)
5551 {
5552 prev_path_data.pending_on_path_challenge = true;
5553 path.prev = Some((cid, prev_path_data));
5556 }
5557
5558 self.qlog.emit_tuple_assigned(path_id, network_path, now);
5560
5561 self.timers.set(
5562 Timer::PerPath(path_id, PathTimer::PathValidationFailed),
5563 now + 3 * cmp::max(self.pto(SpaceKind::Data, path_id), prev_pto),
5564 self.qlog.with_time(now),
5565 );
5566 }
5567
5568 pub fn handle_network_change(&mut self, hint: Option<&dyn NetworkChangeHint>, now: Instant) {
5585 debug!("network changed");
5586 if self.state.is_drained() {
5587 return;
5588 }
5589 if self.highest_space < SpaceKind::Data {
5590 for path in self.paths.values_mut() {
5591 path.data.network_path.local_ip = None;
5593 }
5594
5595 self.update_remote_cid(PathId::ZERO);
5596 self.ping();
5597
5598 return;
5599 }
5600
5601 let mut non_recoverable_paths = Vec::default();
5604 let mut recoverable_paths = Vec::default();
5605 let mut open_paths = 0;
5606
5607 let is_multipath_negotiated = self.is_multipath_negotiated();
5608 let is_client = self.side().is_client();
5609 let immediate_ack_allowed = self.peer_supports_ack_frequency();
5610
5611 for (path_id, path) in self.paths.iter_mut() {
5612 if self.abandoned_paths.contains(path_id) {
5613 continue;
5614 }
5615 open_paths += 1;
5616
5617 let network_path = path.data.network_path;
5620
5621 path.data.network_path.local_ip = None;
5624 let remote = network_path.remote;
5625
5626 let attempt_to_recover = if is_multipath_negotiated {
5630 hint.map(|h| h.is_path_recoverable(*path_id, network_path))
5634 .unwrap_or(!is_client)
5635 } else {
5636 true
5638 };
5639
5640 if attempt_to_recover {
5641 recoverable_paths.push((*path_id, remote));
5642 } else {
5643 non_recoverable_paths.push((*path_id, remote, path.data.local_status()))
5644 }
5645 }
5646
5647 let open_first = open_paths == non_recoverable_paths.len();
5656
5657 for (path_id, remote, status) in non_recoverable_paths.into_iter() {
5658 let network_path = FourTuple {
5659 remote,
5660 local_ip: None, };
5662
5663 if open_first && let Err(e) = self.open_path(network_path, status, now) {
5664 debug!(%e,"Failed to open new path for network change");
5665 recoverable_paths.push((path_id, remote));
5667 continue;
5668 }
5669
5670 if let Err(e) =
5671 self.close_path_inner(now, path_id, PathAbandonReason::UnusableAfterNetworkChange)
5672 {
5673 debug!(%e,"Failed to close unrecoverable path after network change");
5674 recoverable_paths.push((path_id, remote));
5675 continue;
5676 }
5677
5678 if !open_first && let Err(e) = self.open_path(network_path, status, now) {
5679 debug!(%e,"Failed to open new path for network change");
5683 }
5684 }
5685
5686 for (path_id, remote) in recoverable_paths.into_iter() {
5689 if let Some(path_space) = self.spaces[SpaceId::Data].number_spaces.get_mut(&path_id) {
5691 path_space.ping_pending = true;
5692
5693 if immediate_ack_allowed {
5694 path_space.immediate_ack_pending = true;
5695 }
5696 }
5697
5698 if let Some(path) = self.paths.get_mut(&path_id) {
5703 path.data.pto_count = 0;
5704 }
5705 self.set_loss_detection_timer(now, path_id);
5706
5707 let Some((reset_token, retired)) =
5708 self.remote_cids.get_mut(&path_id).and_then(CidQueue::next)
5709 else {
5710 continue;
5711 };
5712
5713 self.spaces[SpaceId::Data]
5715 .pending
5716 .retire_cids
5717 .extend(retired.map(|seq| (path_id, seq)));
5718
5719 debug_assert!(!self.state.is_drained()); self.endpoint_events
5721 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5722 }
5723 }
5724
5725 fn update_remote_cid(&mut self, path_id: PathId) {
5727 let Some((reset_token, retired)) = self
5728 .remote_cids
5729 .get_mut(&path_id)
5730 .and_then(|cids| cids.next())
5731 else {
5732 return;
5733 };
5734
5735 self.spaces[SpaceId::Data]
5737 .pending
5738 .retire_cids
5739 .extend(retired.map(|seq| (path_id, seq)));
5740 let remote = self.path_data(path_id).network_path.remote;
5741 self.set_reset_token(path_id, remote, reset_token);
5742 }
5743
5744 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
5753 debug_assert!(!self.state.is_drained()); self.endpoint_events
5755 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5756
5757 if path_id == PathId::ZERO {
5763 self.peer_params.stateless_reset_token = Some(reset_token);
5764 }
5765 }
5766
5767 fn issue_first_cids(&mut self, now: Instant) {
5769 if self
5770 .local_cid_state
5771 .get(&PathId::ZERO)
5772 .expect("PathId::ZERO exists when the connection is created")
5773 .cid_len()
5774 == 0
5775 {
5776 return;
5777 }
5778
5779 let mut n = self.peer_params.issue_cids_limit() - 1;
5781 if let ConnectionSide::Server { server_config } = &self.side
5782 && server_config.has_preferred_address()
5783 {
5784 n -= 1;
5786 }
5787 debug_assert!(!self.state.is_drained()); self.endpoint_events
5789 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5790 }
5791
5792 fn issue_first_path_cids(&mut self, now: Instant) {
5796 if let Some(max_path_id) = self.max_path_id() {
5797 let mut path_id = self.max_path_id_with_cids.next();
5798 while path_id <= max_path_id {
5799 self.endpoint_events
5800 .push_back(EndpointEventInner::NeedIdentifiers(
5801 path_id,
5802 now,
5803 self.peer_params.issue_cids_limit(),
5804 ));
5805 path_id = path_id.next();
5806 }
5807 self.max_path_id_with_cids = max_path_id;
5808 }
5809 }
5810
5811 fn populate_packet<'a, 'b>(
5819 &mut self,
5820 now: Instant,
5821 space_id: SpaceId,
5822 path_id: PathId,
5823 scheduling_info: &PathSchedulingInfo,
5824 builder: &mut PacketBuilder<'a, 'b>,
5825 ) {
5826 let is_multipath_negotiated = self.is_multipath_negotiated();
5827 let space_has_keys = self.crypto_state.has_keys(space_id.encryption_level());
5828 let is_0rtt = space_id == SpaceId::Data && !space_has_keys;
5829 let stats = &mut self.path_stats.for_path(path_id).frame_tx;
5830 let space = &mut self.spaces[space_id];
5831 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5832 space
5833 .for_path(path_id)
5834 .pending_acks
5835 .maybe_ack_non_eliciting();
5836
5837 if !is_0rtt
5839 && !scheduling_info.is_abandoned
5840 && scheduling_info.may_send_data
5841 && mem::replace(&mut space.pending.handshake_done, false)
5842 {
5843 builder.write_frame(frame::HandshakeDone, stats);
5844 }
5845
5846 if !scheduling_info.is_abandoned
5848 && mem::replace(&mut space.for_path(path_id).ping_pending, false)
5849 {
5850 builder.write_frame(frame::Ping, stats);
5851 }
5852
5853 if !scheduling_info.is_abandoned
5855 && mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false)
5856 {
5857 debug_assert_eq!(
5858 space_id,
5859 SpaceId::Data,
5860 "immediate acks must be sent in the data space"
5861 );
5862 builder.write_frame(frame::ImmediateAck, stats);
5863 }
5864
5865 if !scheduling_info.is_abandoned && scheduling_info.may_send_data {
5867 for path_id in space
5868 .number_spaces
5869 .iter_mut()
5870 .filter(|(_, pns)| pns.pending_acks.can_send())
5871 .map(|(&path_id, _)| path_id)
5872 .collect::<Vec<_>>()
5873 {
5874 Self::populate_acks(
5875 now,
5876 self.receiving_ecn,
5877 path_id,
5878 space_id,
5879 space,
5880 is_multipath_negotiated,
5881 builder,
5882 stats,
5883 space_has_keys,
5884 );
5885 }
5886 }
5887
5888 if !scheduling_info.is_abandoned
5890 && scheduling_info.may_send_data
5891 && mem::replace(&mut space.pending.ack_frequency, false)
5892 {
5893 let sequence_number = self.ack_frequency.next_sequence_number();
5894
5895 let config = self.config.ack_frequency_config.as_ref().unwrap();
5897
5898 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5900 path.rtt.get(),
5901 config,
5902 &self.peer_params,
5903 );
5904
5905 let frame = frame::AckFrequency {
5906 sequence: sequence_number,
5907 ack_eliciting_threshold: config.ack_eliciting_threshold,
5908 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5909 reordering_threshold: config.reordering_threshold,
5910 };
5911 builder.write_frame(frame, stats);
5912
5913 self.ack_frequency
5914 .ack_frequency_sent(path_id, builder.packet_number, max_ack_delay);
5915 }
5916
5917 if !scheduling_info.is_abandoned
5919 && space_id == SpaceId::Data
5920 && path.pending_on_path_challenge
5921 && !self.state.is_closed()
5922 && builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
5923 {
5925 path.pending_on_path_challenge = false;
5926
5927 let token = self.rng.random();
5928 path.record_path_challenge_sent(now, token, path.network_path);
5929 let challenge = frame::PathChallenge(token);
5931 builder.write_frame(challenge, stats);
5932 builder.require_padding();
5933 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5934 match path.open_status {
5935 paths::OpenStatus::Sent | paths::OpenStatus::Informed => {}
5936 paths::OpenStatus::Pending => {
5937 path.open_status = paths::OpenStatus::Sent;
5938 self.timers.set(
5939 Timer::PerPath(path_id, PathTimer::AbandonFromValidation),
5940 now + 3 * pto,
5941 self.qlog.with_time(now),
5942 );
5943 }
5944 paths::OpenStatus::Revalidating => {
5947 path.open_status = paths::OpenStatus::Informed;
5948 self.timers.set(
5949 Timer::PerPath(path_id, PathTimer::AbandonFromValidation),
5950 now + 3 * pto,
5951 self.qlog.with_time(now),
5952 );
5953 }
5954 }
5955
5956 self.timers.set(
5957 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5958 now + pto,
5959 self.qlog.with_time(now),
5960 );
5961
5962 if is_multipath_negotiated && !path.validated && path.pending_on_path_challenge {
5963 space.pending.path_status.insert(path_id);
5965 }
5966
5967 if space_id == SpaceId::Data
5970 && self
5971 .config
5972 .address_discovery_role
5973 .should_report(&self.peer_params.address_discovery_role)
5974 {
5975 let frame = frame::ObservedAddr::new(
5976 path.network_path.remote,
5977 self.next_observed_addr_seq_no,
5978 );
5979 if builder.frame_space_remaining() > frame.size() {
5980 builder.write_frame(frame, stats);
5981
5982 self.next_observed_addr_seq_no =
5983 self.next_observed_addr_seq_no.saturating_add(1u8);
5984 path.observed_addr_sent = true;
5985
5986 space.pending.observed_addr = false;
5987 }
5988 }
5989 }
5990
5991 if !scheduling_info.is_abandoned
5993 && space_id == SpaceId::Data
5994 && builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
5995 && let Some(token) = path.path_responses.pop_on_path(path.network_path)
5996 {
5997 let response = frame::PathResponse(token);
5998 builder.write_frame(response, stats);
5999 builder.require_padding();
6000
6001 if space_id == SpaceId::Data
6005 && self
6006 .config
6007 .address_discovery_role
6008 .should_report(&self.peer_params.address_discovery_role)
6009 {
6010 let frame = frame::ObservedAddr::new(
6011 path.network_path.remote,
6012 self.next_observed_addr_seq_no,
6013 );
6014 if builder.frame_space_remaining() > frame.size() {
6015 builder.write_frame(frame, stats);
6016
6017 self.next_observed_addr_seq_no =
6018 self.next_observed_addr_seq_no.saturating_add(1u8);
6019 path.observed_addr_sent = true;
6020
6021 space.pending.observed_addr = false;
6022 }
6023 }
6024 }
6025
6026 if !scheduling_info.is_abandoned
6028 && scheduling_info.may_send_data
6029 && let Some((round, addresses)) = space.pending.reach_out.as_mut()
6030 {
6031 while let Some(local_addr) = addresses.iter().next().copied() {
6032 let local_addr = addresses.take(&local_addr).expect("found from iter");
6033 let reach_out = frame::ReachOut::new(*round, local_addr);
6034 if builder.frame_space_remaining() > reach_out.size() {
6035 builder.write_frame(reach_out, stats);
6036 } else {
6037 addresses.insert(local_addr);
6038 break;
6039 }
6040 }
6041 if addresses.is_empty() {
6042 space.pending.reach_out = None;
6043 }
6044 }
6045
6046 if space_id == SpaceId::Data
6048 && scheduling_info.is_abandoned
6049 && scheduling_info.may_self_abandon
6050 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
6051 && let Some(error_code) = space.pending.path_abandon.remove(&path_id)
6052 {
6053 let frame = frame::PathAbandon {
6054 path_id,
6055 error_code,
6056 };
6057 builder.write_frame(frame, stats);
6058
6059 self.remote_cids.remove(&path_id);
6062 }
6063 while space_id == SpaceId::Data
6064 && scheduling_info.may_send_data
6065 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
6066 && let Some((abandoned_path_id, error_code)) = space.pending.path_abandon.pop_first()
6067 {
6068 let frame = frame::PathAbandon {
6069 path_id: abandoned_path_id,
6070 error_code,
6071 };
6072 builder.write_frame(frame, stats);
6073
6074 self.remote_cids.remove(&abandoned_path_id);
6077 }
6078
6079 if !scheduling_info.is_abandoned
6081 && scheduling_info.may_send_data
6082 && space_id == SpaceId::Data
6083 && self
6084 .config
6085 .address_discovery_role
6086 .should_report(&self.peer_params.address_discovery_role)
6087 && (!path.observed_addr_sent || space.pending.observed_addr)
6088 {
6089 let frame =
6090 frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
6091 if builder.frame_space_remaining() > frame.size() {
6092 builder.write_frame(frame, stats);
6093
6094 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
6095 path.observed_addr_sent = true;
6096
6097 space.pending.observed_addr = false;
6098 }
6099 }
6100
6101 while !is_0rtt
6103 && !scheduling_info.is_abandoned
6104 && scheduling_info.may_send_data
6105 && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
6106 {
6107 let Some(mut frame) = space.pending.crypto.pop_front() else {
6108 break;
6109 };
6110
6111 let max_crypto_data_size = builder.frame_space_remaining()
6116 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
6118 - 2; let len = frame
6121 .data
6122 .len()
6123 .min(2usize.pow(14) - 1)
6124 .min(max_crypto_data_size);
6125
6126 let data = frame.data.split_to(len);
6127 let offset = frame.offset;
6128 let truncated = frame::Crypto { offset, data };
6129 builder.write_frame(truncated, stats);
6130
6131 if !frame.data.is_empty() {
6132 frame.offset += len as u64;
6133 space.pending.crypto.push_front(frame);
6134 }
6135 }
6136
6137 while space_id == SpaceId::Data
6139 && !scheduling_info.is_abandoned
6140 && scheduling_info.may_send_data
6141 && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
6142 {
6143 let Some(path_id) = space.pending.path_status.pop_first() else {
6144 break;
6145 };
6146 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
6147 trace!(%path_id, "discarding queued path status for unknown path");
6148 continue;
6149 };
6150
6151 let seq = path.status.seq();
6152 match path.local_status() {
6153 PathStatus::Available => {
6154 let frame = frame::PathStatusAvailable {
6155 path_id,
6156 status_seq_no: seq,
6157 };
6158 builder.write_frame(frame, stats);
6159 }
6160 PathStatus::Backup => {
6161 let frame = frame::PathStatusBackup {
6162 path_id,
6163 status_seq_no: seq,
6164 };
6165 builder.write_frame(frame, stats);
6166 }
6167 }
6168 }
6169
6170 if space_id == SpaceId::Data
6172 && !scheduling_info.is_abandoned
6173 && scheduling_info.may_send_data
6174 && space.pending.max_path_id
6175 && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
6176 {
6177 let frame = frame::MaxPathId(self.local_max_path_id);
6178 builder.write_frame(frame, stats);
6179 space.pending.max_path_id = false;
6180 }
6181
6182 if space_id == SpaceId::Data
6184 && !scheduling_info.is_abandoned
6185 && scheduling_info.may_send_data
6186 && space.pending.paths_blocked
6187 && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
6188 {
6189 let frame = frame::PathsBlocked(self.remote_max_path_id);
6190 builder.write_frame(frame, stats);
6191 space.pending.paths_blocked = false;
6192 }
6193
6194 while space_id == SpaceId::Data
6196 && !scheduling_info.is_abandoned
6197 && scheduling_info.may_send_data
6198 && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
6199 {
6200 let Some(path_id) = space.pending.path_cids_blocked.pop_first() else {
6201 break;
6202 };
6203 let next_seq = match self.remote_cids.get(&path_id) {
6204 Some(cid_queue) => VarInt(cid_queue.active_seq() + 1),
6205 None => VarInt(0),
6206 };
6207 let frame = frame::PathCidsBlocked { path_id, next_seq };
6208 builder.write_frame(frame, stats);
6209 }
6210
6211 if space_id == SpaceId::Data
6213 && !scheduling_info.is_abandoned
6214 && scheduling_info.may_send_data
6215 {
6216 self.streams
6217 .write_control_frames(builder, &mut space.pending, stats);
6218 }
6219
6220 let cid_len = self
6222 .local_cid_state
6223 .values()
6224 .map(|cid_state| cid_state.cid_len())
6225 .max()
6226 .expect("some local CID state must exist");
6227 let new_cid_size_bound =
6228 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
6229 while !scheduling_info.is_abandoned
6230 && scheduling_info.may_send_data
6231 && builder.frame_space_remaining() > new_cid_size_bound
6232 {
6233 let Some(issued) = space.pending.new_cids.pop() else {
6234 break;
6235 };
6236 let retire_prior_to = self
6237 .local_cid_state
6238 .get(&issued.path_id)
6239 .map(|cid_state| cid_state.retire_prior_to())
6240 .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
6241
6242 let cid_path_id = match is_multipath_negotiated {
6243 true => Some(issued.path_id),
6244 false => {
6245 debug_assert_eq!(issued.path_id, PathId::ZERO);
6246 None
6247 }
6248 };
6249 let frame = frame::NewConnectionId {
6250 path_id: cid_path_id,
6251 sequence: issued.sequence,
6252 retire_prior_to,
6253 id: issued.id,
6254 reset_token: issued.reset_token,
6255 };
6256 builder.write_frame(frame, stats);
6257 }
6258
6259 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
6261 while !scheduling_info.is_abandoned
6262 && scheduling_info.may_send_data
6263 && builder.frame_space_remaining() > retire_cid_bound
6264 {
6265 let (path_id, sequence) = match space.pending.retire_cids.pop() {
6266 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => (None, seq),
6267 Some((path_id, seq)) => (Some(path_id), seq),
6268 None => break,
6269 };
6270 let frame = frame::RetireConnectionId { path_id, sequence };
6271 builder.write_frame(frame, stats);
6272 }
6273
6274 let mut sent_datagrams = false;
6276 while !scheduling_info.is_abandoned
6277 && scheduling_info.may_send_data
6278 && builder.frame_space_remaining() > Datagram::SIZE_BOUND
6279 && space_id == SpaceId::Data
6280 {
6281 match self.datagrams.write(builder, stats) {
6282 true => {
6283 sent_datagrams = true;
6284 }
6285 false => break,
6286 }
6287 }
6288 if self.datagrams.send_blocked && sent_datagrams {
6289 self.events.push_back(Event::DatagramsUnblocked);
6290 self.datagrams.send_blocked = false;
6291 }
6292
6293 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
6294
6295 if !scheduling_info.is_abandoned && scheduling_info.may_send_data {
6297 while let Some(network_path) = space.pending.new_tokens.pop() {
6298 debug_assert_eq!(space_id, SpaceId::Data);
6299 let ConnectionSide::Server { server_config } = &self.side else {
6300 panic!("NEW_TOKEN frames should not be enqueued by clients");
6301 };
6302
6303 if !network_path.is_probably_same_path(&path.network_path) {
6304 continue;
6309 }
6310
6311 let token = Token::new(
6312 TokenPayload::Validation {
6313 ip: network_path.remote.ip(),
6314 issued: server_config.time_source.now(),
6315 },
6316 &mut self.rng,
6317 );
6318 let new_token = NewToken {
6319 token: token.encode(&*server_config.token_key).into(),
6320 };
6321
6322 if builder.frame_space_remaining() < new_token.size() {
6323 space.pending.new_tokens.push(network_path);
6324 break;
6325 }
6326
6327 builder.write_frame(new_token, stats);
6328 builder.retransmits_mut().new_tokens.push(network_path);
6329 }
6330 }
6331
6332 while space_id == SpaceId::Data
6334 && !scheduling_info.is_abandoned
6335 && scheduling_info.may_send_data
6336 && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
6337 {
6338 if let Some(added_address) = space.pending.add_address.pop_last() {
6339 builder.write_frame(added_address, stats);
6340 } else {
6341 break;
6342 }
6343 }
6344
6345 while space_id == SpaceId::Data
6347 && !scheduling_info.is_abandoned
6348 && scheduling_info.may_send_data
6349 && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
6350 {
6351 if let Some(removed_address) = space.pending.remove_address.pop_last() {
6352 builder.write_frame(removed_address, stats);
6353 } else {
6354 break;
6355 }
6356 }
6357
6358 if !scheduling_info.is_abandoned
6360 && scheduling_info.may_send_data
6361 && space_id == SpaceId::Data
6362 {
6363 self.streams
6364 .write_stream_frames(builder, self.config.send_fairness, stats);
6365 }
6366 }
6367
6368 fn populate_acks<'a, 'b>(
6370 now: Instant,
6371 receiving_ecn: bool,
6372 path_id: PathId,
6373 space_id: SpaceId,
6374 space: &mut PacketSpace,
6375 is_multipath_negotiated: bool,
6376 builder: &mut PacketBuilder<'a, 'b>,
6377 stats: &mut FrameStats,
6378 space_has_keys: bool,
6379 ) {
6380 debug_assert!(space_has_keys, "tried to send ACK in 0-RTT");
6382
6383 debug_assert!(
6384 is_multipath_negotiated || path_id == PathId::ZERO,
6385 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
6386 );
6387 if is_multipath_negotiated {
6388 debug_assert!(
6389 space_id == SpaceId::Data || path_id == PathId::ZERO,
6390 "path acks must be sent in 1RTT space (have {space_id:?})"
6391 );
6392 }
6393
6394 let pns = space.for_path(path_id);
6395 let ranges = pns.pending_acks.ranges();
6396 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
6397 let ecn = if receiving_ecn {
6398 Some(&pns.ecn_counters)
6399 } else {
6400 None
6401 };
6402
6403 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
6404 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
6406 let delay = delay_micros >> ack_delay_exp.into_inner();
6407
6408 if is_multipath_negotiated && space_id == SpaceId::Data {
6409 if !ranges.is_empty() {
6410 let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
6411 builder.write_frame(frame, stats);
6412 }
6413 } else {
6414 builder.write_frame(frame::Ack::encoder(delay, ranges, ecn), stats);
6415 }
6416 }
6417
6418 fn close_common(&mut self) {
6419 trace!("connection closed");
6420 self.timers.reset();
6421 }
6422
6423 fn set_close_timer(&mut self, now: Instant) {
6424 let pto_max = self.max_pto_for_space(self.highest_space);
6427 self.timers.set(
6428 Timer::Conn(ConnTimer::Close),
6429 now + 3 * pto_max,
6430 self.qlog.with_time(now),
6431 );
6432 }
6433
6434 fn handle_peer_params(
6439 &mut self,
6440 params: TransportParameters,
6441 local_cid: ConnectionId,
6442 remote_cid: ConnectionId,
6443 now: Instant,
6444 ) -> Result<(), TransportError> {
6445 if Some(self.original_remote_cid) != params.initial_src_cid
6446 || (self.side.is_client()
6447 && (Some(self.initial_dst_cid) != params.original_dst_cid
6448 || self.retry_src_cid != params.retry_src_cid))
6449 {
6450 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
6451 "CID authentication failure",
6452 ));
6453 }
6454 if params.initial_max_path_id.is_some() && (local_cid.is_empty() || remote_cid.is_empty()) {
6455 return Err(TransportError::PROTOCOL_VIOLATION(
6456 "multipath must not use zero-length CIDs",
6457 ));
6458 }
6459
6460 self.set_peer_params(params);
6461 self.qlog.emit_peer_transport_params_received(self, now);
6462
6463 Ok(())
6464 }
6465
6466 fn set_peer_params(&mut self, params: TransportParameters) {
6467 self.streams.set_params(¶ms);
6468 self.idle_timeout =
6469 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
6470 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
6471
6472 if let Some(ref info) = params.preferred_address {
6473 self.remote_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
6475 path_id: None,
6476 sequence: 1,
6477 id: info.connection_id,
6478 reset_token: info.stateless_reset_token,
6479 retire_prior_to: 0,
6480 })
6481 .expect(
6482 "preferred address CID is the first received, and hence is guaranteed to be legal",
6483 );
6484 let remote = self.path_data(PathId::ZERO).network_path.remote;
6485 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
6486 }
6487 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
6488
6489 let mut multipath_enabled = None;
6490 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
6491 self.config.get_initial_max_path_id(),
6492 params.initial_max_path_id,
6493 ) {
6494 self.local_max_path_id = local_max_path_id;
6496 self.remote_max_path_id = remote_max_path_id;
6497 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
6498 debug!(%initial_max_path_id, "multipath negotiated");
6499 multipath_enabled = Some(initial_max_path_id);
6500 }
6501
6502 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
6503 self.config
6504 .max_remote_nat_traversal_addresses
6505 .zip(params.max_remote_nat_traversal_addresses)
6506 {
6507 if let Some(max_initial_paths) =
6508 multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
6509 {
6510 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
6511 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
6512 self.n0_nat_traversal = n0_nat_traversal::State::new(
6513 max_remote_addresses,
6514 max_local_addresses,
6515 self.side(),
6516 );
6517 debug!(
6518 %max_remote_addresses, %max_local_addresses,
6519 "n0's nat traversal negotiated"
6520 );
6521
6522 match self.side() {
6523 Side::Client => {
6524 if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
6525 debug!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
6528 } else if max_local_addresses as u64
6529 > params.active_connection_id_limit.into_inner()
6530 {
6531 debug!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
6535 }
6536 }
6537 Side::Server => {
6538 if (max_initial_paths.as_u32() as u64) < crate::LOCAL_CID_COUNT {
6539 debug!(%max_initial_paths, local_cid_limit=%crate::LOCAL_CID_COUNT, "local server configuration might cause nat traversal issues")
6540 }
6541 }
6542 }
6543 } else {
6544 debug!("n0 nat traversal enabled for both endpoints, but multipath is missing")
6545 }
6546 }
6547
6548 self.peer_params = params;
6549 let peer_max_udp_payload_size =
6550 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
6551 self.path_data_mut(PathId::ZERO)
6552 .mtud
6553 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
6554 }
6555
6556 fn decrypt_packet(
6558 &mut self,
6559 now: Instant,
6560 path_id: PathId,
6561 packet: &mut Packet,
6562 ) -> Result<Option<u64>, Option<TransportError>> {
6563 let result = self
6564 .crypto_state
6565 .decrypt_packet_body(packet, path_id, &self.spaces)?;
6566
6567 let Some(result) = result else {
6568 return Ok(None);
6569 };
6570
6571 if result.outgoing_key_update_acked
6572 && let Some(prev) = self.crypto_state.prev_crypto.as_mut()
6573 {
6574 prev.end_packet = Some((result.packet_number, now));
6575 self.set_key_discard_timer(now, packet.header.space());
6576 }
6577
6578 if result.incoming_key_update {
6579 trace!("key update authenticated");
6580 self.crypto_state
6581 .update_keys(Some((result.packet_number, now)), true);
6582 self.set_key_discard_timer(now, packet.header.space());
6583 }
6584
6585 Ok(Some(result.packet_number))
6586 }
6587
6588 fn peer_supports_ack_frequency(&self) -> bool {
6589 self.peer_params.min_ack_delay.is_some()
6590 }
6591
6592 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
6597 debug_assert_eq!(
6598 self.highest_space,
6599 SpaceKind::Data,
6600 "immediate ack must be written in the data space"
6601 );
6602 self.spaces[SpaceId::Data]
6603 .for_path(path_id)
6604 .immediate_ack_pending = true;
6605 }
6606
6607 #[cfg(test)]
6609 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
6610 let ConnectionEventInner::Datagram(DatagramConnectionEvent {
6611 path_id,
6612 first_decode,
6613 remaining,
6614 ..
6615 }) = &event.0
6616 else {
6617 return None;
6618 };
6619
6620 if remaining.is_some() {
6621 panic!("Packets should never be coalesced in tests");
6622 }
6623
6624 let decrypted_header = self
6625 .crypto_state
6626 .unprotect_header(first_decode.clone(), self.peer_params.stateless_reset_token)?;
6627
6628 let mut packet = decrypted_header.packet?;
6629 self.crypto_state
6630 .decrypt_packet_body(&mut packet, *path_id, &self.spaces)
6631 .ok()?;
6632
6633 Some(packet.payload.to_vec())
6634 }
6635
6636 #[cfg(test)]
6639 pub(crate) fn bytes_in_flight(&self) -> u64 {
6640 self.path_data(PathId::ZERO).in_flight.bytes
6642 }
6643
6644 #[cfg(test)]
6646 pub(crate) fn congestion_window(&self) -> u64 {
6647 let path = self.path_data(PathId::ZERO);
6648 path.congestion
6649 .window()
6650 .saturating_sub(path.in_flight.bytes)
6651 }
6652
6653 #[cfg(test)]
6655 pub(crate) fn is_idle(&self) -> bool {
6656 let current_timers = self.timers.values();
6657 current_timers
6658 .into_iter()
6659 .filter(|(timer, _)| {
6660 !matches!(
6661 timer,
6662 Timer::Conn(ConnTimer::KeepAlive)
6663 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6664 | Timer::Conn(ConnTimer::PushNewCid)
6665 | Timer::Conn(ConnTimer::KeyDiscard)
6666 )
6667 })
6668 .min_by_key(|(_, time)| *time)
6669 .is_none_or(|(timer, _)| {
6670 matches!(
6671 timer,
6672 Timer::Conn(ConnTimer::Idle) | Timer::PerPath(_, PathTimer::PathIdle)
6673 )
6674 })
6675 }
6676
6677 #[cfg(test)]
6679 pub(crate) fn using_ecn(&self) -> bool {
6680 self.path_data(PathId::ZERO).sending_ecn
6681 }
6682
6683 #[cfg(test)]
6685 pub(crate) fn total_recvd(&self) -> u64 {
6686 self.path_data(PathId::ZERO).total_recvd
6687 }
6688
6689 #[cfg(test)]
6690 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6691 self.local_cid_state
6692 .get(&PathId::ZERO)
6693 .unwrap()
6694 .active_seq()
6695 }
6696
6697 #[cfg(test)]
6698 #[track_caller]
6699 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6700 self.local_cid_state
6701 .get(&PathId(path_id))
6702 .unwrap()
6703 .active_seq()
6704 }
6705
6706 #[cfg(test)]
6709 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6710 let n = self
6711 .local_cid_state
6712 .get_mut(&PathId::ZERO)
6713 .unwrap()
6714 .assign_retire_seq(v);
6715 debug_assert!(!self.state.is_drained()); self.endpoint_events
6717 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6718 }
6719
6720 #[cfg(test)]
6722 pub(crate) fn active_remote_cid_seq(&self) -> u64 {
6723 self.remote_cids.get(&PathId::ZERO).unwrap().active_seq()
6724 }
6725
6726 #[cfg(test)]
6728 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6729 self.path_data(path_id).current_mtu()
6730 }
6731
6732 #[cfg(test)]
6734 pub(crate) fn trigger_path_validation(&mut self) {
6735 for path in self.paths.values_mut() {
6736 path.data.pending_on_path_challenge = true;
6737 }
6738 }
6739
6740 #[cfg(test)]
6742 pub fn simulate_protocol_violation(&mut self, now: Instant) {
6743 if !self.state.is_closed() {
6744 self.state
6745 .move_to_closed(TransportError::PROTOCOL_VIOLATION("simulated violation"));
6746 self.close_common();
6747 if !self.state.is_drained() {
6748 self.set_close_timer(now);
6749 }
6750 self.connection_close_pending = true;
6751 }
6752 }
6753
6754 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6765 let space_specific = self.paths.get(&path_id).is_some_and(|path| {
6766 path.data.pending_on_path_challenge || !path.data.path_responses.is_empty()
6767 });
6768
6769 let other = self.streams.can_send_stream_data()
6771 || self
6772 .datagrams
6773 .outgoing
6774 .front()
6775 .is_some_and(|x| x.size(true) <= max_size);
6776
6777 SendableFrames {
6779 acks: false,
6780 close: false,
6781 space_specific,
6782 other,
6783 }
6784 }
6785
6786 fn kill(&mut self, reason: ConnectionError) {
6788 self.close_common();
6789 self.state.move_to_drained(Some(reason));
6790 self.endpoint_events.push_back(EndpointEventInner::Drained);
6793 }
6794
6795 pub fn current_mtu(&self) -> u16 {
6802 self.paths
6803 .iter()
6804 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6805 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6806 .min()
6807 .unwrap_or(INITIAL_MTU)
6808 }
6809
6810 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6817 let pn_len = PacketNumber::new(
6818 pn,
6819 self.spaces[SpaceId::Data]
6820 .for_path(path)
6821 .largest_acked_packet
6822 .unwrap_or(0),
6823 )
6824 .len();
6825
6826 1 + self
6828 .remote_cids
6829 .get(&path)
6830 .map(|cids| cids.active().len())
6831 .unwrap_or(20) + pn_len
6833 + self.tag_len_1rtt()
6834 }
6835
6836 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6837 let pn_len = 4;
6838
6839 let cid_len = self
6840 .remote_cids
6841 .values()
6842 .map(|cids| cids.active().len())
6843 .max()
6844 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
6848 }
6849
6850 fn tag_len_1rtt(&self) -> usize {
6851 let packet_crypto = self
6853 .crypto_state
6854 .encryption_keys(SpaceKind::Data, self.side.side())
6855 .map(|(_header, packet, _level)| packet);
6856 packet_crypto.map_or(16, |x| x.tag_len())
6860 }
6861
6862 fn on_path_validated(&mut self, path_id: PathId) {
6864 self.path_data_mut(path_id).validated = true;
6865 let ConnectionSide::Server { server_config } = &self.side else {
6866 return;
6867 };
6868 let network_path = self.path_data(path_id).network_path;
6869 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6870 new_tokens.clear();
6871 for _ in 0..server_config.validation_token.sent {
6872 new_tokens.push(network_path);
6873 }
6874 }
6875
6876 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6878 if let Some(path) = self.paths.get_mut(&path_id) {
6879 path.data.status.remote_update(status, status_seq_no);
6880 } else {
6881 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6882 }
6883 self.events.push_back(
6884 PathEvent::RemoteStatus {
6885 id: path_id,
6886 status,
6887 }
6888 .into(),
6889 );
6890 }
6891
6892 fn max_path_id(&self) -> Option<PathId> {
6901 if self.is_multipath_negotiated() {
6902 Some(self.remote_max_path_id.min(self.local_max_path_id))
6903 } else {
6904 None
6905 }
6906 }
6907
6908 fn is_ipv6(&self) -> bool {
6913 self.paths
6914 .values()
6915 .any(|p| p.data.network_path.remote.is_ipv6())
6916 }
6917
6918 pub fn add_nat_traversal_address(
6920 &mut self,
6921 address: SocketAddr,
6922 ) -> Result<(), n0_nat_traversal::Error> {
6923 if let Some(added) = self.n0_nat_traversal.add_local_address(address)? {
6924 self.spaces[SpaceId::Data].pending.add_address.insert(added);
6925 };
6926 Ok(())
6927 }
6928
6929 pub fn remove_nat_traversal_address(
6933 &mut self,
6934 address: SocketAddr,
6935 ) -> Result<(), n0_nat_traversal::Error> {
6936 if let Some(removed) = self.n0_nat_traversal.remove_local_address(address)? {
6937 self.spaces[SpaceId::Data]
6938 .pending
6939 .remove_address
6940 .insert(removed);
6941 }
6942 Ok(())
6943 }
6944
6945 pub fn get_local_nat_traversal_addresses(
6947 &self,
6948 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6949 self.n0_nat_traversal.get_local_nat_traversal_addresses()
6950 }
6951
6952 pub fn get_remote_nat_traversal_addresses(
6954 &self,
6955 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
6956 Ok(self
6957 .n0_nat_traversal
6958 .client_side()?
6959 .get_remote_nat_traversal_addresses())
6960 }
6961
6962 fn open_nat_traversal_path(
6966 &mut self,
6967 now: Instant,
6968 ip_port: (IpAddr, u16),
6969 ) -> Result<Option<(PathId, SocketAddr)>, PathError> {
6970 let remote = ip_port.into();
6971 let network_path = FourTuple {
6976 remote,
6977 local_ip: None,
6978 };
6979 match self.open_path_ensure(network_path, PathStatus::Backup, now) {
6980 Ok((path_id, path_was_known)) => {
6981 if path_was_known {
6982 trace!(%path_id, %remote, "nat traversal: path existed for remote, revalidating");
6983 if let Some(path) = self.paths.get_mut(&path_id) {
6984 use paths::OpenStatus::*;
6985
6986 path.data.pending_on_path_challenge = true;
6987 path.data.open_status = match path.data.open_status {
6988 Pending => Pending,
6993 Sent => Pending,
6997 Revalidating => Revalidating,
7000 Informed => Revalidating,
7006 }
7007 }
7008 }
7009 Ok(Some((path_id, remote)))
7010 }
7011 Err(e) => {
7012 debug!(%remote, %e, "nat traversal: failed to probe remote");
7013 Err(e)
7014 }
7015 }
7016 }
7017
7018 pub fn initiate_nat_traversal_round(
7028 &mut self,
7029 now: Instant,
7030 ) -> Result<Vec<SocketAddr>, n0_nat_traversal::Error> {
7031 if self.state.is_closed() {
7032 return Err(n0_nat_traversal::Error::Closed);
7033 }
7034
7035 let ipv6 = self.is_ipv6();
7036 let client_state = self.n0_nat_traversal.client_side_mut()?;
7037 let n0_nat_traversal::NatTraversalRound {
7038 new_round,
7039 reach_out_at,
7040 addresses_to_probe,
7041 prev_round_path_ids,
7042 } = client_state.initiate_nat_traversal_round(ipv6)?;
7043
7044 trace!(%new_round, reach_out=reach_out_at.len(), to_probe=addresses_to_probe.len(),
7045 "initiating nat traversal round");
7046
7047 self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
7048
7049 for path_id in prev_round_path_ids {
7050 let Some(path) = self.path(path_id) else {
7051 continue;
7052 };
7053 let ip = path.network_path.remote.ip();
7054 let port = path.network_path.remote.port();
7055
7056 if !addresses_to_probe
7060 .iter()
7061 .any(|(_, probe)| *probe == (ip, port))
7062 && !path.validated
7063 && !self.abandoned_paths.contains(&path_id)
7064 {
7065 trace!(%path_id, "closing path from previous round");
7066 let _ =
7067 self.close_path_inner(now, path_id, PathAbandonReason::NatTraversalRoundEnded);
7068 }
7069 }
7070
7071 let mut err = None;
7072
7073 let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
7074 let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
7075
7076 for (id, address) in addresses_to_probe {
7077 match self.open_nat_traversal_path(now, address) {
7078 Ok(None) => {}
7079 Ok(Some((path_id, remote))) => {
7080 path_ids.push(path_id);
7081 probed_addresses.push(remote);
7082 }
7083 Err(e) => {
7084 self.n0_nat_traversal
7085 .client_side_mut()
7086 .expect("validated")
7087 .report_in_continuation(id, e);
7088 err.get_or_insert(e);
7089 }
7090 }
7091 }
7092
7093 if let Some(err) = err {
7094 if probed_addresses.is_empty() {
7096 return Err(n0_nat_traversal::Error::Multipath(err));
7097 }
7098 }
7099
7100 self.n0_nat_traversal
7101 .client_side_mut()
7102 .expect("connection side validated")
7103 .set_round_path_ids(path_ids);
7104
7105 Ok(probed_addresses)
7106 }
7107
7108 fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
7113 let ipv6 = self.is_ipv6();
7114 let client_state = self.n0_nat_traversal.client_side_mut().ok()?;
7115 let (id, address) = client_state.continue_nat_traversal_round(ipv6)?;
7116 let open_result = self.open_nat_traversal_path(now, address);
7117 let client_state = self.n0_nat_traversal.client_side_mut().expect("validated");
7118 match open_result {
7119 Ok(None) => Some(true),
7120 Ok(Some((path_id, _remote))) => {
7121 client_state.add_round_path_id(path_id);
7122 Some(true)
7123 }
7124 Err(e) => {
7125 client_state.report_in_continuation(id, e);
7126 Some(false)
7127 }
7128 }
7129 }
7130
7131 fn is_handshake_confirmed(&self) -> bool {
7140 !self.is_handshaking() && !self.crypto_state.has_keys(EncryptionLevel::Handshake)
7141 }
7142}
7143
7144impl fmt::Debug for Connection {
7145 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
7146 f.debug_struct("Connection")
7147 .field("handshake_cid", &self.handshake_cid)
7148 .finish()
7149 }
7150}
7151
7152pub trait NetworkChangeHint: std::fmt::Debug + 'static {
7154 fn is_path_recoverable(&self, path_id: PathId, network_path: FourTuple) -> bool;
7163}
7164
7165#[derive(Debug)]
7167enum PollPathSpaceStatus {
7168 NothingToSend {
7170 congestion_blocked: bool,
7172 },
7173 WrotePacket {
7175 last_packet_number: u64,
7177 pad_datagram: PadDatagram,
7191 },
7192 Send {
7199 last_packet_number: u64,
7201 },
7202}
7203
7204#[derive(Debug, Copy, Clone)]
7210struct PathSchedulingInfo {
7211 is_abandoned: bool,
7217 may_send_data: bool,
7235 may_send_close: bool,
7241 may_self_abandon: bool,
7242}
7243
7244#[derive(Debug, Copy, Clone, PartialEq, Eq)]
7245enum PathBlocked {
7246 No,
7247 AntiAmplification,
7248 Congestion,
7249 Pacing,
7250}
7251
7252enum ConnectionSide {
7254 Client {
7255 token: Bytes,
7257 token_store: Arc<dyn TokenStore>,
7258 server_name: String,
7259 },
7260 Server {
7261 server_config: Arc<ServerConfig>,
7262 },
7263}
7264
7265impl ConnectionSide {
7266 fn remote_may_migrate(&self, state: &State) -> bool {
7267 match self {
7268 Self::Server { server_config } => server_config.migration,
7269 Self::Client { .. } => {
7270 if let Some(hs) = state.as_handshake() {
7271 hs.allow_server_migration
7272 } else {
7273 false
7274 }
7275 }
7276 }
7277 }
7278
7279 fn is_client(&self) -> bool {
7280 self.side().is_client()
7281 }
7282
7283 fn is_server(&self) -> bool {
7284 self.side().is_server()
7285 }
7286
7287 fn side(&self) -> Side {
7288 match *self {
7289 Self::Client { .. } => Side::Client,
7290 Self::Server { .. } => Side::Server,
7291 }
7292 }
7293}
7294
7295impl From<SideArgs> for ConnectionSide {
7296 fn from(side: SideArgs) -> Self {
7297 match side {
7298 SideArgs::Client {
7299 token_store,
7300 server_name,
7301 } => Self::Client {
7302 token: token_store.take(&server_name).unwrap_or_default(),
7303 token_store,
7304 server_name,
7305 },
7306 SideArgs::Server {
7307 server_config,
7308 pref_addr_cid: _,
7309 path_validated: _,
7310 } => Self::Server { server_config },
7311 }
7312 }
7313}
7314
7315pub(crate) enum SideArgs {
7317 Client {
7318 token_store: Arc<dyn TokenStore>,
7319 server_name: String,
7320 },
7321 Server {
7322 server_config: Arc<ServerConfig>,
7323 pref_addr_cid: Option<ConnectionId>,
7324 path_validated: bool,
7325 },
7326}
7327
7328impl SideArgs {
7329 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
7330 match *self {
7331 Self::Client { .. } => None,
7332 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
7333 }
7334 }
7335
7336 pub(crate) fn path_validated(&self) -> bool {
7337 match *self {
7338 Self::Client { .. } => true,
7339 Self::Server { path_validated, .. } => path_validated,
7340 }
7341 }
7342
7343 pub(crate) fn side(&self) -> Side {
7344 match *self {
7345 Self::Client { .. } => Side::Client,
7346 Self::Server { .. } => Side::Server,
7347 }
7348 }
7349}
7350
7351#[derive(Debug, Error, Clone, PartialEq, Eq)]
7353pub enum ConnectionError {
7354 #[error("peer doesn't implement any supported version")]
7356 VersionMismatch,
7357 #[error(transparent)]
7359 TransportError(#[from] TransportError),
7360 #[error("aborted by peer: {0}")]
7362 ConnectionClosed(frame::ConnectionClose),
7363 #[error("closed by peer: {0}")]
7365 ApplicationClosed(frame::ApplicationClose),
7366 #[error("reset by peer")]
7368 Reset,
7369 #[error("timed out")]
7375 TimedOut,
7376 #[error("closed")]
7378 LocallyClosed,
7379 #[error("CIDs exhausted")]
7383 CidsExhausted,
7384}
7385
7386impl From<Close> for ConnectionError {
7387 fn from(x: Close) -> Self {
7388 match x {
7389 Close::Connection(reason) => Self::ConnectionClosed(reason),
7390 Close::Application(reason) => Self::ApplicationClosed(reason),
7391 }
7392 }
7393}
7394
7395impl From<ConnectionError> for io::Error {
7397 fn from(x: ConnectionError) -> Self {
7398 use ConnectionError::*;
7399 let kind = match x {
7400 TimedOut => io::ErrorKind::TimedOut,
7401 Reset => io::ErrorKind::ConnectionReset,
7402 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
7403 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
7404 io::ErrorKind::Other
7405 }
7406 };
7407 Self::new(kind, x)
7408 }
7409}
7410
7411#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
7414pub enum PathError {
7415 #[error("multipath extension not negotiated")]
7417 MultipathNotNegotiated,
7418 #[error("the server side may not open a path")]
7420 ServerSideNotAllowed,
7421 #[error("maximum number of concurrent paths reached")]
7423 MaxPathIdReached,
7424 #[error("remoted CIDs exhausted")]
7426 RemoteCidsExhausted,
7427 #[error("path validation failed")]
7429 ValidationFailed,
7430 #[error("invalid remote address")]
7432 InvalidRemoteAddress(SocketAddr),
7433}
7434
7435#[derive(Debug, Error, Clone, Eq, PartialEq)]
7437pub enum ClosePathError {
7438 #[error("Multipath extension not negotiated")]
7440 MultipathNotNegotiated,
7441 #[error("closed path")]
7443 ClosedPath,
7444 #[error("last open path")]
7448 LastOpenPath,
7449}
7450
7451#[derive(Debug, Error, Clone, Copy)]
7453#[error("Multipath extension not negotiated")]
7454pub struct MultipathNotNegotiated {
7455 _private: (),
7456}
7457
7458#[derive(Debug)]
7460pub enum Event {
7461 HandshakeDataReady,
7463 Connected,
7465 HandshakeConfirmed,
7467 ConnectionLost {
7474 reason: ConnectionError,
7476 },
7477 Stream(StreamEvent),
7479 DatagramReceived,
7481 DatagramsUnblocked,
7483 Path(PathEvent),
7485 NatTraversal(n0_nat_traversal::Event),
7487}
7488
7489impl From<PathEvent> for Event {
7490 fn from(source: PathEvent) -> Self {
7491 Self::Path(source)
7492 }
7493}
7494
7495fn get_max_ack_delay(params: &TransportParameters) -> Duration {
7496 Duration::from_micros(params.max_ack_delay.0 * 1000)
7497}
7498
7499const MAX_BACKOFF_EXPONENT: u32 = 16;
7501
7502const MAX_PTO_INTERVAL: Duration = Duration::from_secs(2);
7506
7507const MIN_IDLE_FOR_FAST_PTO: Duration = Duration::from_secs(25);
7509
7510const MAX_PTO_FAST_INTERVAL: Duration = Duration::from_secs(1);
7515
7516const SLOW_RTT_THRESHOLD: Duration =
7521 Duration::from_millis((MAX_PTO_INTERVAL.as_millis() as u64 * 2) / 3);
7522
7523const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
7531
7532const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
7538 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
7539
7540#[derive(Default)]
7541struct SentFrames {
7542 retransmits: ThinRetransmits,
7543 largest_acked: FxHashMap<PathId, u64>,
7545 stream_frames: StreamMetaVec,
7546 non_retransmits: bool,
7548 requires_padding: bool,
7550}
7551
7552impl SentFrames {
7553 fn is_ack_only(&self, streams: &StreamsState) -> bool {
7555 !self.largest_acked.is_empty()
7556 && !self.non_retransmits
7557 && self.stream_frames.is_empty()
7558 && self.retransmits.is_empty(streams)
7559 }
7560
7561 fn retransmits_mut(&mut self) -> &mut Retransmits {
7562 self.retransmits.get_or_create()
7563 }
7564
7565 fn record_sent_frame(&mut self, frame: frame::EncodableFrame<'_>) {
7566 use frame::EncodableFrame::*;
7567 match frame {
7568 PathAck(path_ack_encoder) => {
7569 if let Some(max) = path_ack_encoder.ranges.max() {
7570 self.largest_acked.insert(path_ack_encoder.path_id, max);
7571 }
7572 }
7573 Ack(ack_encoder) => {
7574 if let Some(max) = ack_encoder.ranges.max() {
7575 self.largest_acked.insert(PathId::ZERO, max);
7576 }
7577 }
7578 Close(_) => { }
7579 PathResponse(_) => self.non_retransmits = true,
7580 HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
7581 ReachOut(frame::ReachOut { round, ip, port }) => {
7582 let (recorded_round, reach_outs) = self
7583 .retransmits_mut()
7584 .reach_out
7585 .get_or_insert_with(|| (round, FxHashSet::default()));
7586 if *recorded_round == round {
7588 reach_outs.insert((ip, port));
7590 } else if *recorded_round < round {
7591 *recorded_round = round;
7593 reach_outs.drain();
7594 reach_outs.insert((ip, port));
7595 } else {
7596 }
7598 }
7599
7600 ObservedAddr(_) => self.retransmits_mut().observed_addr = true,
7601 Ping(_) => self.non_retransmits = true,
7602 ImmediateAck(_) => self.non_retransmits = true,
7603 AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
7604 PathChallenge(_) => self.non_retransmits = true,
7605 Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
7606 PathAbandon(path_abandon) => {
7607 self.retransmits_mut()
7608 .path_abandon
7609 .entry(path_abandon.path_id)
7610 .or_insert(path_abandon.error_code);
7611 }
7612 PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
7613 | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
7614 self.retransmits_mut().path_status.insert(path_id);
7615 }
7616 MaxPathId(_) => self.retransmits_mut().max_path_id = true,
7617 PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
7618 PathCidsBlocked(path_cids_blocked) => {
7619 self.retransmits_mut()
7620 .path_cids_blocked
7621 .insert(path_cids_blocked.path_id);
7622 }
7623 ResetStream(reset) => self
7624 .retransmits_mut()
7625 .reset_stream
7626 .push((reset.id, reset.error_code)),
7627 StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
7628 NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
7629 RetireConnectionId(retire_cid) => self
7630 .retransmits_mut()
7631 .retire_cids
7632 .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
7633 Datagram(_) => self.non_retransmits = true,
7634 NewToken(_) => {}
7635 AddAddress(add_address) => {
7636 self.retransmits_mut().add_address.insert(add_address);
7637 }
7638 RemoveAddress(remove_address) => {
7639 self.retransmits_mut().remove_address.insert(remove_address);
7640 }
7641 StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
7642 MaxData(_) => self.retransmits_mut().max_data = true,
7643 MaxStreamData(max) => {
7644 self.retransmits_mut().max_stream_data.insert(max.id);
7645 }
7646 MaxStreams(max_streams) => {
7647 self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
7648 }
7649 StreamsBlocked(streams_blocked) => {
7650 self.retransmits_mut().streams_blocked[streams_blocked.dir as usize] = true
7651 }
7652 }
7653 }
7654}
7655
7656fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
7664 match (x, y) {
7665 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
7666 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
7667 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
7668 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
7669 }
7670}
7671
7672#[cfg(test)]
7673mod tests {
7674 use super::*;
7675
7676 #[test]
7677 fn negotiate_max_idle_timeout_commutative() {
7678 let test_params = [
7679 (None, None, None),
7680 (None, Some(VarInt(0)), None),
7681 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
7682 (Some(VarInt(0)), Some(VarInt(0)), None),
7683 (
7684 Some(VarInt(2)),
7685 Some(VarInt(0)),
7686 Some(Duration::from_millis(2)),
7687 ),
7688 (
7689 Some(VarInt(1)),
7690 Some(VarInt(4)),
7691 Some(Duration::from_millis(1)),
7692 ),
7693 ];
7694
7695 for (left, right, result) in test_params {
7696 assert_eq!(negotiate_max_idle_timeout(left, right), result);
7697 assert_eq!(negotiate_max_idle_timeout(right, left), result);
7698 }
7699 }
7700}