1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 num::{NonZeroU32, NonZeroUsize},
8 ops::Not,
9 sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21 Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
22 MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
23 TransportError, TransportErrorCode, VarInt,
24 cid_generator::ConnectionIdGenerator,
25 cid_queue::CidQueue,
26 config::{ServerConfig, TransportConfig},
27 congestion::Controller,
28 connection::{
29 qlog::{QlogRecvPacket, QlogSink},
30 spaces::LostPacket,
31 timer::{ConnTimer, PathTimer},
32 },
33 crypto::{self, KeyPair, Keys, PacketKey},
34 frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
35 iroh_hp,
36 packet::{
37 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
38 PacketNumber, PartialDecode, SpaceId,
39 },
40 range_set::ArrayRangeSet,
41 shared::{
42 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
43 EndpointEvent, EndpointEventInner,
44 },
45 token::{ResetToken, Token, TokenPayload},
46 transport_parameters::TransportParameters,
47};
48
49mod ack_frequency;
50use ack_frequency::AckFrequencyState;
51
52mod assembler;
53pub use assembler::Chunk;
54
55mod cid_state;
56use cid_state::CidState;
57
58mod datagrams;
59use datagrams::DatagramState;
60pub use datagrams::{Datagrams, SendDatagramError};
61
62mod mtud;
63mod pacing;
64
65mod packet_builder;
66use packet_builder::{PacketBuilder, PadDatagram};
67
68mod packet_crypto;
69use packet_crypto::{PrevCrypto, ZeroRttCrypto};
70
71mod paths;
72pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
73use paths::{PathData, PathState};
74
75pub(crate) mod qlog;
76pub(crate) mod send_buffer;
77
78mod spaces;
79#[cfg(fuzzing)]
80pub use spaces::Retransmits;
81#[cfg(not(fuzzing))]
82use spaces::Retransmits;
83use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
84
85mod stats;
86pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
87
88mod streams;
89#[cfg(fuzzing)]
90pub use streams::StreamsState;
91#[cfg(not(fuzzing))]
92use streams::StreamsState;
93pub use streams::{
94 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
95 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
96};
97
98mod timer;
99use timer::{Timer, TimerTable};
100
101mod transmit_buf;
102use transmit_buf::TransmitBuf;
103
104mod state;
105
106#[cfg(not(fuzzing))]
107use state::State;
108#[cfg(fuzzing)]
109pub use state::State;
110use state::StateType;
111
112pub struct Connection {
152 endpoint_config: Arc<EndpointConfig>,
153 config: Arc<TransportConfig>,
154 rng: StdRng,
155 crypto: Box<dyn crypto::Session>,
156 handshake_cid: ConnectionId,
158 rem_handshake_cid: ConnectionId,
160 paths: BTreeMap<PathId, PathState>,
166 path_counter: u64,
170 allow_mtud: bool,
172 state: State,
173 side: ConnectionSide,
174 zero_rtt_enabled: bool,
176 zero_rtt_crypto: Option<ZeroRttCrypto>,
178 key_phase: bool,
179 key_phase_size: u64,
181 peer_params: TransportParameters,
183 orig_rem_cid: ConnectionId,
185 initial_dst_cid: ConnectionId,
187 retry_src_cid: Option<ConnectionId>,
190 events: VecDeque<Event>,
192 endpoint_events: VecDeque<EndpointEventInner>,
193 spin_enabled: bool,
195 spin: bool,
197 spaces: [PacketSpace; 3],
199 highest_space: SpaceId,
201 prev_crypto: Option<PrevCrypto>,
203 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
208 accepted_0rtt: bool,
209 permit_idle_reset: bool,
211 idle_timeout: Option<Duration>,
213 timers: TimerTable,
214 authentication_failures: u64,
216
217 close: bool,
222
223 ack_frequency: AckFrequencyState,
227
228 receiving_ecn: bool,
233 total_authed_packets: u64,
235 app_limited: bool,
238
239 next_observed_addr_seq_no: VarInt,
244
245 streams: StreamsState,
246 rem_cids: FxHashMap<PathId, CidQueue>,
252 local_cid_state: FxHashMap<PathId, CidState>,
259 datagrams: DatagramState,
261 stats: ConnectionStats,
263 path_stats: FxHashMap<PathId, PathStats>,
265 version: u32,
267
268 max_concurrent_paths: NonZeroU32,
277 local_max_path_id: PathId,
292 remote_max_path_id: PathId,
298 max_path_id_with_cids: PathId,
304 abandoned_paths: FxHashSet<PathId>,
312
313 iroh_hp: iroh_hp::State,
314 qlog: QlogSink,
315}
316
317impl Connection {
318 pub(crate) fn new(
319 endpoint_config: Arc<EndpointConfig>,
320 config: Arc<TransportConfig>,
321 init_cid: ConnectionId,
322 loc_cid: ConnectionId,
323 rem_cid: ConnectionId,
324 network_path: FourTuple,
325 crypto: Box<dyn crypto::Session>,
326 cid_gen: &dyn ConnectionIdGenerator,
327 now: Instant,
328 version: u32,
329 allow_mtud: bool,
330 rng_seed: [u8; 32],
331 side_args: SideArgs,
332 qlog: QlogSink,
333 ) -> Self {
334 let pref_addr_cid = side_args.pref_addr_cid();
335 let path_validated = side_args.path_validated();
336 let connection_side = ConnectionSide::from(side_args);
337 let side = connection_side.side();
338 let mut rng = StdRng::from_seed(rng_seed);
339 let initial_space = {
340 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
341 space.crypto = Some(crypto.initial_keys(init_cid, side));
342 space
343 };
344 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
345 #[cfg(test)]
346 let data_space = match config.deterministic_packet_numbers {
347 true => PacketSpace::new_deterministic(now, SpaceId::Data),
348 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
349 };
350 #[cfg(not(test))]
351 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
352 let state = State::handshake(state::Handshake {
353 rem_cid_set: side.is_server(),
354 expected_token: Bytes::new(),
355 client_hello: None,
356 allow_server_migration: side.is_client(),
357 });
358 let local_cid_state = FxHashMap::from_iter([(
359 PathId::ZERO,
360 CidState::new(
361 cid_gen.cid_len(),
362 cid_gen.cid_lifetime(),
363 now,
364 if pref_addr_cid.is_some() { 2 } else { 1 },
365 ),
366 )]);
367
368 let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
369 path.open = true;
371 let mut this = Self {
372 endpoint_config,
373 crypto,
374 handshake_cid: loc_cid,
375 rem_handshake_cid: rem_cid,
376 local_cid_state,
377 paths: BTreeMap::from_iter([(
378 PathId::ZERO,
379 PathState {
380 data: path,
381 prev: None,
382 },
383 )]),
384 path_counter: 0,
385 allow_mtud,
386 state,
387 side: connection_side,
388 zero_rtt_enabled: false,
389 zero_rtt_crypto: None,
390 key_phase: false,
391 key_phase_size: rng.random_range(10..1000),
398 peer_params: TransportParameters::default(),
399 orig_rem_cid: rem_cid,
400 initial_dst_cid: init_cid,
401 retry_src_cid: None,
402 events: VecDeque::new(),
403 endpoint_events: VecDeque::new(),
404 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
405 spin: false,
406 spaces: [initial_space, handshake_space, data_space],
407 highest_space: SpaceId::Initial,
408 prev_crypto: None,
409 next_crypto: None,
410 accepted_0rtt: false,
411 permit_idle_reset: true,
412 idle_timeout: match config.max_idle_timeout {
413 None | Some(VarInt(0)) => None,
414 Some(dur) => Some(Duration::from_millis(dur.0)),
415 },
416 timers: TimerTable::default(),
417 authentication_failures: 0,
418 close: false,
419
420 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
421 &TransportParameters::default(),
422 )),
423
424 app_limited: false,
425 receiving_ecn: false,
426 total_authed_packets: 0,
427
428 next_observed_addr_seq_no: 0u32.into(),
429
430 streams: StreamsState::new(
431 side,
432 config.max_concurrent_uni_streams,
433 config.max_concurrent_bidi_streams,
434 config.send_window,
435 config.receive_window,
436 config.stream_receive_window,
437 ),
438 datagrams: DatagramState::default(),
439 config,
440 rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
441 rng,
442 stats: ConnectionStats::default(),
443 path_stats: Default::default(),
444 version,
445
446 max_concurrent_paths: NonZeroU32::MIN,
448 local_max_path_id: PathId::ZERO,
449 remote_max_path_id: PathId::ZERO,
450 max_path_id_with_cids: PathId::ZERO,
451 abandoned_paths: Default::default(),
452
453 iroh_hp: Default::default(),
455 qlog,
456 };
457 if path_validated {
458 this.on_path_validated(PathId::ZERO);
459 }
460 if side.is_client() {
461 this.write_crypto();
463 this.init_0rtt(now);
464 }
465 this.qlog
466 .emit_tuple_assigned(PathId::ZERO, network_path, now);
467 this
468 }
469
470 #[must_use]
478 pub fn poll_timeout(&mut self) -> Option<Instant> {
479 self.timers.peek()
480 }
481
482 #[must_use]
488 pub fn poll(&mut self) -> Option<Event> {
489 if let Some(x) = self.events.pop_front() {
490 return Some(x);
491 }
492
493 if let Some(event) = self.streams.poll() {
494 return Some(Event::Stream(event));
495 }
496
497 if let Some(reason) = self.state.take_error() {
498 return Some(Event::ConnectionLost { reason });
499 }
500
501 None
502 }
503
504 #[must_use]
506 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
507 self.endpoint_events.pop_front().map(EndpointEvent)
508 }
509
510 #[must_use]
512 pub fn streams(&mut self) -> Streams<'_> {
513 Streams {
514 state: &mut self.streams,
515 conn_state: &self.state,
516 }
517 }
518
519 #[must_use]
521 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
522 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
523 RecvStream {
524 id,
525 state: &mut self.streams,
526 pending: &mut self.spaces[SpaceId::Data].pending,
527 }
528 }
529
530 #[must_use]
532 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
533 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
534 SendStream {
535 id,
536 state: &mut self.streams,
537 pending: &mut self.spaces[SpaceId::Data].pending,
538 conn_state: &self.state,
539 }
540 }
541
542 pub fn open_path_ensure(
559 &mut self,
560 network_path: FourTuple,
561 initial_status: PathStatus,
562 now: Instant,
563 ) -> Result<(PathId, bool), PathError> {
564 Ok(
565 match self
566 .paths
567 .iter()
568 .find(|(_id, path)| network_path.is_probably_same_path(&path.data.network_path))
569 {
570 Some((path_id, _state)) => (*path_id, true),
571 None => (self.open_path(network_path, initial_status, now)?, false),
572 },
573 )
574 }
575
576 pub fn open_path(
581 &mut self,
582 network_path: FourTuple,
583 initial_status: PathStatus,
584 now: Instant,
585 ) -> Result<PathId, PathError> {
586 if !self.is_multipath_negotiated() {
587 return Err(PathError::MultipathNotNegotiated);
588 }
589 if self.side().is_server() {
590 return Err(PathError::ServerSideNotAllowed);
591 }
592
593 let max_abandoned = self.abandoned_paths.iter().max().copied();
594 let max_used = self.paths.keys().last().copied();
595 let path_id = max_abandoned
596 .max(max_used)
597 .unwrap_or(PathId::ZERO)
598 .saturating_add(1u8);
599
600 if Some(path_id) > self.max_path_id() {
601 return Err(PathError::MaxPathIdReached);
602 }
603 if path_id > self.remote_max_path_id {
604 self.spaces[SpaceId::Data].pending.paths_blocked = true;
605 return Err(PathError::MaxPathIdReached);
606 }
607 if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
608 self.spaces[SpaceId::Data]
609 .pending
610 .path_cids_blocked
611 .push(path_id);
612 return Err(PathError::RemoteCidsExhausted);
613 }
614
615 let path = self.ensure_path(path_id, network_path, now, None);
616 path.status.local_update(initial_status);
617
618 Ok(path_id)
619 }
620
621 pub fn close_path(
627 &mut self,
628 now: Instant,
629 path_id: PathId,
630 error_code: VarInt,
631 ) -> Result<(), ClosePathError> {
632 if self.abandoned_paths.contains(&path_id)
633 || Some(path_id) > self.max_path_id()
634 || !self.paths.contains_key(&path_id)
635 {
636 return Err(ClosePathError::ClosedPath);
637 }
638 if self
639 .paths
640 .iter()
641 .any(|(id, path)| {
643 *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
644 })
645 .not()
646 {
647 return Err(ClosePathError::LastOpenPath);
648 }
649
650 self.spaces[SpaceId::Data]
652 .pending
653 .path_abandon
654 .insert(path_id, error_code.into());
655
656 let pending_space = &mut self.spaces[SpaceId::Data].pending;
658 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
659 pending_space.path_cids_blocked.retain(|&id| id != path_id);
660 pending_space.path_status.retain(|&id| id != path_id);
661
662 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
664 for sent_packet in space.sent_packets.values_mut() {
665 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
666 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
667 retransmits.path_cids_blocked.retain(|&id| id != path_id);
668 retransmits.path_status.retain(|&id| id != path_id);
669 }
670 }
671 }
672
673 self.rem_cids.remove(&path_id);
679 self.endpoint_events
680 .push_back(EndpointEventInner::RetireResetToken(path_id));
681
682 let pto = self.pto_max_path(SpaceId::Data, false);
683
684 let path = self.paths.get_mut(&path_id).expect("checked above");
685
686 path.data.last_allowed_receive = Some(now + 3 * pto);
688 self.abandoned_paths.insert(path_id);
689
690 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
691
692 self.timers.set(
697 Timer::PerPath(path_id, PathTimer::DiscardPath),
698 now + 6 * pto,
699 self.qlog.with_time(now),
700 );
701 Ok(())
702 }
703
704 #[track_caller]
708 fn path_data(&self, path_id: PathId) -> &PathData {
709 if let Some(data) = self.paths.get(&path_id) {
710 &data.data
711 } else {
712 panic!(
713 "unknown path: {path_id}, currently known paths: {:?}",
714 self.paths.keys().collect::<Vec<_>>()
715 );
716 }
717 }
718
719 fn path(&self, path_id: PathId) -> Option<&PathData> {
721 self.paths.get(&path_id).map(|path_state| &path_state.data)
722 }
723
724 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
726 self.paths
727 .get_mut(&path_id)
728 .map(|path_state| &mut path_state.data)
729 }
730
731 pub fn paths(&self) -> Vec<PathId> {
735 self.paths.keys().copied().collect()
736 }
737
738 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
740 self.path(path_id)
741 .map(PathData::local_status)
742 .ok_or(ClosedPath { _private: () })
743 }
744
745 pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
747 self.path(path_id)
748 .map(|path| path.network_path)
749 .ok_or(ClosedPath { _private: () })
750 }
751
752 pub fn set_path_status(
756 &mut self,
757 path_id: PathId,
758 status: PathStatus,
759 ) -> Result<PathStatus, SetPathStatusError> {
760 if !self.is_multipath_negotiated() {
761 return Err(SetPathStatusError::MultipathNotNegotiated);
762 }
763 let path = self
764 .path_mut(path_id)
765 .ok_or(SetPathStatusError::ClosedPath)?;
766 let prev = match path.status.local_update(status) {
767 Some(prev) => {
768 self.spaces[SpaceId::Data]
769 .pending
770 .path_status
771 .insert(path_id);
772 prev
773 }
774 None => path.local_status(),
775 };
776 Ok(prev)
777 }
778
779 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
784 self.path(path_id).and_then(|path| path.remote_status())
785 }
786
787 pub fn set_path_max_idle_timeout(
793 &mut self,
794 path_id: PathId,
795 timeout: Option<Duration>,
796 ) -> Result<Option<Duration>, ClosedPath> {
797 let path = self
798 .paths
799 .get_mut(&path_id)
800 .ok_or(ClosedPath { _private: () })?;
801 Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
802 }
803
804 pub fn set_path_keep_alive_interval(
810 &mut self,
811 path_id: PathId,
812 interval: Option<Duration>,
813 ) -> Result<Option<Duration>, ClosedPath> {
814 let path = self
815 .paths
816 .get_mut(&path_id)
817 .ok_or(ClosedPath { _private: () })?;
818 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
819 }
820
821 #[track_caller]
825 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
826 &mut self.paths.get_mut(&path_id).expect("known path").data
827 }
828
829 fn find_validated_path_on_network_path(
833 &self,
834 network_path: FourTuple,
835 ) -> Option<(&PathId, &PathState)> {
836 self.paths.iter().find(|(path_id, path_state)| {
837 path_state.data.validated
838 && network_path.is_probably_same_path(&path_state.data.network_path)
840 && !self.abandoned_paths.contains(path_id)
841 })
842 }
846
847 fn ensure_path(
848 &mut self,
849 path_id: PathId,
850 network_path: FourTuple,
851 now: Instant,
852 pn: Option<u64>,
853 ) -> &mut PathData {
854 let valid_path = self.find_validated_path_on_network_path(network_path);
855 let validated = valid_path.is_some();
856 let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
857 let vacant_entry = match self.paths.entry(path_id) {
858 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
859 btree_map::Entry::Occupied(occupied_entry) => {
860 return &mut occupied_entry.into_mut().data;
861 }
862 };
863
864 debug!(%validated, %path_id, %network_path, "path added");
865 let peer_max_udp_payload_size =
866 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
867 self.path_counter = self.path_counter.wrapping_add(1);
868 let mut data = PathData::new(
869 network_path,
870 self.allow_mtud,
871 Some(peer_max_udp_payload_size),
872 self.path_counter,
873 now,
874 &self.config,
875 );
876
877 data.validated = validated;
878 if let Some(initial_rtt) = initial_rtt {
879 data.rtt.reset_initial_rtt(initial_rtt);
880 }
881
882 let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
883 self.timers.set(
884 Timer::PerPath(path_id, PathTimer::PathOpen),
885 now + 3 * pto,
886 self.qlog.with_time(now),
887 );
888
889 data.send_new_challenge = true;
892
893 let path = vacant_entry.insert(PathState { data, prev: None });
894
895 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
896 if let Some(pn) = pn {
897 pn_space.dedup.insert(pn);
898 }
899 self.spaces[SpaceId::Data]
900 .number_spaces
901 .insert(path_id, pn_space);
902 self.qlog.emit_tuple_assigned(path_id, network_path, now);
903 &mut path.data
904 }
905
906 #[must_use]
916 pub fn poll_transmit(
917 &mut self,
918 now: Instant,
919 max_datagrams: NonZeroUsize,
920 buf: &mut Vec<u8>,
921 ) -> Option<Transmit> {
922 if let Some(probing) = self
923 .iroh_hp
924 .server_side_mut()
925 .ok()
926 .and_then(iroh_hp::ServerState::next_probe)
927 {
928 let destination = probing.remote();
929 trace!(%destination, "RAND_DATA packet");
930 let token: u64 = self.rng.random();
931 buf.put_u64(token);
932 probing.finish(token);
933 return Some(Transmit {
934 destination,
935 ecn: None,
936 size: 8,
937 segment_size: None,
938 src_ip: None,
939 });
940 }
941
942 let max_datagrams = match self.config.enable_segmentation_offload {
943 false => NonZeroUsize::MIN,
944 true => max_datagrams,
945 };
946
947 let close = match self.state.as_type() {
966 StateType::Drained => {
967 self.app_limited = true;
968 return None;
969 }
970 StateType::Draining | StateType::Closed => {
971 if !self.close {
974 self.app_limited = true;
975 return None;
976 }
977 true
978 }
979 _ => false,
980 };
981
982 if let Some(config) = &self.config.ack_frequency_config {
984 let rtt = self
985 .paths
986 .values()
987 .map(|p| p.data.rtt.get())
988 .min()
989 .expect("one path exists");
990 self.spaces[SpaceId::Data].pending.ack_frequency = self
991 .ack_frequency
992 .should_send_ack_frequency(rtt, config, &self.peer_params)
993 && self.highest_space == SpaceId::Data
994 && self.peer_supports_ack_frequency();
995 }
996
997 let mut coalesce = true;
999
1000 let mut pad_datagram = PadDatagram::No;
1003
1004 let mut congestion_blocked = false;
1008
1009 let mut last_packet_number = None;
1011
1012 let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
1013
1014 let have_available_path = self.paths.iter().any(|(id, path)| {
1017 path.data.validated
1018 && path.data.local_status() == PathStatus::Available
1019 && self.rem_cids.contains_key(id)
1020 });
1021
1022 let mut transmit = TransmitBuf::new(
1024 buf,
1025 max_datagrams,
1026 self.path_data(path_id).current_mtu().into(),
1027 );
1028 if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
1029 return Some(challenge);
1030 }
1031 let mut space_id = match path_id {
1032 PathId::ZERO => SpaceId::Initial,
1033 _ => SpaceId::Data,
1034 };
1035
1036 loop {
1037 let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1039 let err = PathError::RemoteCidsExhausted;
1040 if !self.abandoned_paths.contains(&path_id) {
1041 debug!(?err, %path_id, "no active CID for path");
1042 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1043 id: path_id,
1044 error: err,
1045 }));
1046 self.close_path(
1050 now,
1051 path_id,
1052 TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1053 )
1054 .ok();
1055 self.spaces[SpaceId::Data]
1056 .pending
1057 .path_cids_blocked
1058 .push(path_id);
1059 } else {
1060 trace!(%path_id, "remote CIDs retired for abandoned path");
1061 }
1062
1063 match self.paths.keys().find(|&&next| next > path_id) {
1064 Some(next_path_id) => {
1065 path_id = *next_path_id;
1067 space_id = SpaceId::Data;
1068
1069 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1071 if let Some(challenge) =
1072 self.send_prev_path_challenge(now, &mut transmit, path_id)
1073 {
1074 return Some(challenge);
1075 }
1076
1077 continue;
1078 }
1079 None => {
1080 trace!(
1082 ?space_id,
1083 %path_id,
1084 "no CIDs to send on path, no more paths"
1085 );
1086 break;
1087 }
1088 }
1089 };
1090
1091 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1094 transmit.datagram_remaining_mut()
1096 } else {
1097 transmit.segment_size()
1099 };
1100 let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1101 let path_should_send = {
1102 let path_exclusive_only = space_id == SpaceId::Data
1103 && have_available_path
1104 && self.path_data(path_id).local_status() == PathStatus::Backup;
1105 let path_should_send = if path_exclusive_only {
1106 can_send.path_exclusive
1107 } else {
1108 !can_send.is_empty()
1109 };
1110 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1111 path_should_send || needs_loss_probe
1112 };
1113
1114 if !path_should_send && space_id < SpaceId::Data {
1115 if self.spaces[space_id].crypto.is_some() {
1116 trace!(?space_id, %path_id, "nothing to send in space");
1117 }
1118 space_id = space_id.next();
1119 continue;
1120 }
1121
1122 let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1123 self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1125 } else {
1126 PathBlocked::No
1127 };
1128 if send_blocked != PathBlocked::No {
1129 trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1130 congestion_blocked = true;
1131 }
1132 if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1133 space_id = space_id.next();
1136 continue;
1137 }
1138 if !path_should_send || send_blocked != PathBlocked::No {
1139 if transmit.num_datagrams() > 0 {
1144 break;
1145 }
1146
1147 match self.paths.keys().find(|&&next| next > path_id) {
1148 Some(next_path_id) => {
1149 trace!(
1151 ?space_id,
1152 %path_id,
1153 %next_path_id,
1154 "nothing to send on path"
1155 );
1156 path_id = *next_path_id;
1157 space_id = SpaceId::Data;
1158
1159 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1161 if let Some(challenge) =
1162 self.send_prev_path_challenge(now, &mut transmit, path_id)
1163 {
1164 return Some(challenge);
1165 }
1166
1167 continue;
1168 }
1169 None => {
1170 trace!(
1172 ?space_id,
1173 %path_id,
1174 next_path_id=?None::<PathId>,
1175 "nothing to send on path"
1176 );
1177 break;
1178 }
1179 }
1180 }
1181
1182 if transmit.datagram_remaining_mut() == 0 {
1184 if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1185 break;
1187 }
1188
1189 match self.spaces[space_id].for_path(path_id).loss_probes {
1190 0 => transmit.start_new_datagram(),
1191 _ => {
1192 let request_immediate_ack =
1194 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1195 self.spaces[space_id].maybe_queue_probe(
1196 path_id,
1197 request_immediate_ack,
1198 &self.streams,
1199 );
1200
1201 self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1202
1203 transmit.start_new_datagram_with_size(std::cmp::min(
1207 usize::from(INITIAL_MTU),
1208 transmit.segment_size(),
1209 ));
1210 }
1211 }
1212 trace!(count = transmit.num_datagrams(), "new datagram started");
1213 coalesce = true;
1214 pad_datagram = PadDatagram::No;
1215 }
1216
1217 if transmit.datagram_start_offset() < transmit.len() {
1220 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1221 }
1222
1223 if self.spaces[SpaceId::Initial].crypto.is_some()
1228 && space_id == SpaceId::Handshake
1229 && self.side.is_client()
1230 {
1231 self.discard_space(now, SpaceId::Initial);
1234 }
1235 if let Some(ref mut prev) = self.prev_crypto {
1236 prev.update_unacked = false;
1237 }
1238
1239 let mut builder = PacketBuilder::new(
1240 now,
1241 space_id,
1242 path_id,
1243 remote_cid,
1244 &mut transmit,
1245 can_send.other,
1246 self,
1247 )?;
1248 last_packet_number = Some(builder.exact_number);
1249 coalesce = coalesce && !builder.short_header;
1250
1251 if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1252 pad_datagram |= PadDatagram::ToMinMtu;
1254 }
1255 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1256 pad_datagram |= PadDatagram::ToSegmentSize;
1257 }
1258
1259 if can_send.close {
1260 trace!("sending CONNECTION_CLOSE");
1261 let is_multipath_negotiated = self.is_multipath_negotiated();
1266 for path_id in self.spaces[space_id]
1267 .number_spaces
1268 .iter()
1269 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1270 .map(|(&path_id, _)| path_id)
1271 .collect::<Vec<_>>()
1272 {
1273 Self::populate_acks(
1274 now,
1275 self.receiving_ecn,
1276 path_id,
1277 space_id,
1278 &mut self.spaces[space_id],
1279 is_multipath_negotiated,
1280 &mut builder,
1281 &mut self.stats.frame_tx,
1282 );
1283 }
1284
1285 debug_assert!(
1289 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1290 "ACKs should leave space for ConnectionClose"
1291 );
1292 let stats = &mut self.stats.frame_tx;
1293 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1294 let max_frame_size = builder.frame_space_remaining();
1295 let close: Close = match self.state.as_type() {
1296 StateType::Closed => {
1297 let reason: Close =
1298 self.state.as_closed().expect("checked").clone().into();
1299 if space_id == SpaceId::Data || reason.is_transport_layer() {
1300 reason
1301 } else {
1302 TransportError::APPLICATION_ERROR("").into()
1303 }
1304 }
1305 StateType::Draining => TransportError::NO_ERROR("").into(),
1306 _ => unreachable!(
1307 "tried to make a close packet when the connection wasn't closed"
1308 ),
1309 };
1310 builder.encode(close.encoder(max_frame_size), stats);
1311 }
1312 builder.finish_and_track(now, self, path_id, pad_datagram);
1313 if space_id == self.highest_space {
1314 self.close = false;
1317 break;
1319 } else {
1320 space_id = space_id.next();
1324 continue;
1325 }
1326 }
1327
1328 if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1331 let path = self.path_data_mut(path_id);
1332 if let Some((token, network_path)) =
1333 path.path_responses.pop_off_path(path.network_path)
1334 {
1335 let response = frame::PathResponse(token);
1339 trace!(%response, "(off-path)");
1340 builder.encode(response, &mut self.stats.frame_tx);
1341 builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
1342 self.stats.udp_tx.on_sent(1, transmit.len());
1343 return Some(Transmit {
1344 destination: network_path.remote,
1345 size: transmit.len(),
1346 ecn: None,
1347 segment_size: None,
1348 src_ip: network_path.local_ip,
1349 });
1350 }
1351 }
1352
1353 let path_exclusive_only =
1354 have_available_path && self.path_data(path_id).local_status() == PathStatus::Backup;
1355 self.populate_packet(now, space_id, path_id, path_exclusive_only, &mut builder);
1356
1357 debug_assert!(
1364 !(builder.sent_frames().is_ack_only(&self.streams)
1365 && !can_send.acks
1366 && can_send.other
1367 && builder.buf.segment_size()
1368 == self.path_data(path_id).current_mtu() as usize
1369 && self.datagrams.outgoing.is_empty()),
1370 "SendableFrames was {can_send:?}, but only ACKs have been written"
1371 );
1372 if builder.sent_frames().requires_padding {
1373 pad_datagram |= PadDatagram::ToMinMtu;
1374 }
1375
1376 for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1377 self.spaces[space_id]
1378 .for_path(*path_id)
1379 .pending_acks
1380 .acks_sent();
1381 self.timers.stop(
1382 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1383 self.qlog.with_time(now),
1384 );
1385 }
1386
1387 if coalesce
1395 && builder
1396 .buf
1397 .datagram_remaining_mut()
1398 .saturating_sub(builder.predict_packet_end())
1399 > MIN_PACKET_SPACE
1400 && self
1401 .next_send_space(space_id, path_id, builder.buf, close)
1402 .is_some()
1403 {
1404 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1407 } else {
1408 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1411 const MAX_PADDING: usize = 32;
1419 if builder.buf.datagram_remaining_mut()
1420 > builder.predict_packet_end() + MAX_PADDING
1421 {
1422 trace!(
1423 "GSO truncated by demand for {} padding bytes",
1424 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1425 );
1426 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1427 break;
1428 }
1429
1430 builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1433 } else {
1434 builder.finish_and_track(now, self, path_id, pad_datagram);
1435 }
1436 if transmit.num_datagrams() == 1 {
1437 transmit.clip_datagram_size();
1438 }
1439 }
1440 }
1441
1442 if let Some(last_packet_number) = last_packet_number {
1443 self.path_data_mut(path_id).congestion.on_sent(
1446 now,
1447 transmit.len() as u64,
1448 last_packet_number,
1449 );
1450 }
1451
1452 self.qlog.emit_recovery_metrics(
1453 path_id,
1454 &mut self.paths.get_mut(&path_id).unwrap().data,
1455 now,
1456 );
1457
1458 self.app_limited = transmit.is_empty() && !congestion_blocked;
1459
1460 if transmit.is_empty() && self.state.is_established() {
1462 let space_id = SpaceId::Data;
1464 path_id = *self.paths.first_key_value().expect("one path must exist").0;
1465 let probe_data = loop {
1466 let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1472 let eligible = self.path_data(path_id).validated
1473 && !self.path_data(path_id).is_validating_path()
1474 && !self.abandoned_paths.contains(&path_id);
1475 let probe_size = eligible
1476 .then(|| {
1477 let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1478 self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1479 })
1480 .flatten();
1481 match (active_cid, probe_size) {
1482 (Some(active_cid), Some(probe_size)) => {
1483 break Some((active_cid, probe_size));
1485 }
1486 _ => {
1487 match self.paths.keys().find(|&&next| next > path_id) {
1489 Some(next) => {
1490 path_id = *next;
1491 continue;
1492 }
1493 None => break None,
1494 }
1495 }
1496 }
1497 };
1498 if let Some((active_cid, probe_size)) = probe_data {
1499 debug_assert_eq!(transmit.num_datagrams(), 0);
1501 transmit.start_new_datagram_with_size(probe_size as usize);
1502
1503 let mut builder = PacketBuilder::new(
1504 now,
1505 space_id,
1506 path_id,
1507 active_cid,
1508 &mut transmit,
1509 true,
1510 self,
1511 )?;
1512
1513 trace!(?probe_size, "writing MTUD probe");
1515 trace!("PING");
1516 builder.encode(frame::Ping, &mut self.stats.frame_tx);
1517
1518 if self.peer_supports_ack_frequency() {
1520 trace!("IMMEDIATE_ACK");
1521 builder.encode(frame::ImmediateAck, &mut self.stats.frame_tx);
1522 }
1523
1524 builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1525
1526 self.path_stats
1527 .entry(path_id)
1528 .or_default()
1529 .sent_plpmtud_probes += 1;
1530 }
1531 }
1532
1533 if transmit.is_empty() {
1534 return None;
1535 }
1536
1537 let network_path = self.path_data(path_id).network_path;
1538 trace!(
1539 segment_size = transmit.segment_size(),
1540 last_datagram_len = transmit.len() % transmit.segment_size(),
1541 %network_path,
1542 "sending {} bytes in {} datagrams",
1543 transmit.len(),
1544 transmit.num_datagrams()
1545 );
1546 self.path_data_mut(path_id)
1547 .inc_total_sent(transmit.len() as u64);
1548
1549 self.stats
1550 .udp_tx
1551 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1552
1553 Some(Transmit {
1554 destination: network_path.remote,
1555 size: transmit.len(),
1556 ecn: if self.path_data(path_id).sending_ecn {
1557 Some(EcnCodepoint::Ect0)
1558 } else {
1559 None
1560 },
1561 segment_size: match transmit.num_datagrams() {
1562 1 => None,
1563 _ => Some(transmit.segment_size()),
1564 },
1565 src_ip: network_path.local_ip,
1566 })
1567 }
1568
1569 fn next_send_space(
1574 &mut self,
1575 current_space_id: SpaceId,
1576 path_id: PathId,
1577 buf: &TransmitBuf<'_>,
1578 close: bool,
1579 ) -> Option<SpaceId> {
1580 let mut space_id = current_space_id;
1587 loop {
1588 let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1589 if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1590 return Some(space_id);
1591 }
1592 space_id = match space_id {
1593 SpaceId::Initial => SpaceId::Handshake,
1594 SpaceId::Handshake => SpaceId::Data,
1595 SpaceId::Data => break,
1596 }
1597 }
1598 None
1599 }
1600
1601 fn path_congestion_check(
1603 &mut self,
1604 space_id: SpaceId,
1605 path_id: PathId,
1606 transmit: &TransmitBuf<'_>,
1607 can_send: &SendableFrames,
1608 now: Instant,
1609 ) -> PathBlocked {
1610 if self.side().is_server()
1616 && self
1617 .path_data(path_id)
1618 .anti_amplification_blocked(transmit.len() as u64 + 1)
1619 {
1620 trace!(?space_id, %path_id, "blocked by anti-amplification");
1621 return PathBlocked::AntiAmplification;
1622 }
1623
1624 let bytes_to_send = transmit.segment_size() as u64;
1627 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1628
1629 if can_send.other && !need_loss_probe && !can_send.close {
1630 let path = self.path_data(path_id);
1631 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1632 trace!(?space_id, %path_id, "blocked by congestion control");
1633 return PathBlocked::Congestion;
1634 }
1635 }
1636
1637 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1639 self.timers.set(
1640 Timer::PerPath(path_id, PathTimer::Pacing),
1641 delay,
1642 self.qlog.with_time(now),
1643 );
1644 trace!(?space_id, %path_id, "blocked by pacing");
1647 return PathBlocked::Pacing;
1648 }
1649
1650 PathBlocked::No
1651 }
1652
1653 fn send_prev_path_challenge(
1658 &mut self,
1659 now: Instant,
1660 buf: &mut TransmitBuf<'_>,
1661 path_id: PathId,
1662 ) -> Option<Transmit> {
1663 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1664 if !prev_path.send_new_challenge {
1667 return None;
1668 };
1669 prev_path.send_new_challenge = false;
1670 let network_path = prev_path.network_path;
1671 let token = self.rng.random();
1672 let info = paths::SentChallengeInfo {
1673 sent_instant: now,
1674 network_path,
1675 };
1676 prev_path.challenges_sent.insert(token, info);
1677 debug_assert_eq!(
1678 self.highest_space,
1679 SpaceId::Data,
1680 "PATH_CHALLENGE queued without 1-RTT keys"
1681 );
1682 buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1683
1684 debug_assert_eq!(buf.datagram_start_offset(), 0);
1690 let mut builder =
1691 PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, false, self)?;
1692 let challenge = frame::PathChallenge(token);
1693 trace!(%challenge, "validating previous path");
1694 builder.encode(challenge, &mut self.stats.frame_tx);
1695
1696 builder.pad_to(MIN_INITIAL_SIZE);
1701
1702 builder.finish(self, now);
1703 self.stats.udp_tx.on_sent(1, buf.len());
1704
1705 Some(Transmit {
1706 destination: network_path.remote,
1707 size: buf.len(),
1708 ecn: None,
1709 segment_size: None,
1710 src_ip: network_path.local_ip,
1711 })
1712 }
1713
1714 fn space_can_send(
1719 &mut self,
1720 space_id: SpaceId,
1721 path_id: PathId,
1722 packet_size: usize,
1723 close: bool,
1724 ) -> SendableFrames {
1725 let pn = self.spaces[SpaceId::Data]
1726 .for_path(path_id)
1727 .peek_tx_number();
1728 let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1729 if self.spaces[space_id].crypto.is_none()
1730 && (space_id != SpaceId::Data
1731 || self.zero_rtt_crypto.is_none()
1732 || self.side.is_server())
1733 {
1734 return SendableFrames::empty();
1736 }
1737 let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1738 if space_id == SpaceId::Data {
1739 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1740 }
1741
1742 can_send.close = close && self.spaces[space_id].crypto.is_some();
1743
1744 can_send
1745 }
1746
1747 pub fn handle_event(&mut self, event: ConnectionEvent) {
1753 use ConnectionEventInner::*;
1754 match event.0 {
1755 Datagram(DatagramConnectionEvent {
1756 now,
1757 network_path,
1758 path_id,
1759 ecn,
1760 first_decode,
1761 remaining,
1762 }) => {
1763 let span = trace_span!("pkt", %path_id);
1764 let _guard = span.enter();
1765
1766 if self.update_network_path_or_discard(network_path, path_id) {
1767 return;
1769 }
1770
1771 let was_anti_amplification_blocked = self
1772 .path(path_id)
1773 .map(|path| path.anti_amplification_blocked(1))
1774 .unwrap_or(true); self.stats.udp_rx.datagrams += 1;
1778 self.stats.udp_rx.bytes += first_decode.len() as u64;
1779 let data_len = first_decode.len();
1780
1781 self.handle_decode(now, network_path, path_id, ecn, first_decode);
1782 if let Some(path) = self.path_mut(path_id) {
1787 path.inc_total_recvd(data_len as u64);
1788 }
1789
1790 if let Some(data) = remaining {
1791 self.stats.udp_rx.bytes += data.len() as u64;
1792 self.handle_coalesced(now, network_path, path_id, ecn, data);
1793 }
1794
1795 if let Some(path) = self.paths.get_mut(&path_id) {
1796 self.qlog
1797 .emit_recovery_metrics(path_id, &mut path.data, now);
1798 }
1799
1800 if was_anti_amplification_blocked {
1801 self.set_loss_detection_timer(now, path_id);
1805 }
1806 }
1807 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1808 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1809 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1810 let cid_state = self
1811 .local_cid_state
1812 .entry(path_id)
1813 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1814 cid_state.new_cids(&ids, now);
1815
1816 ids.into_iter().rev().for_each(|frame| {
1817 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1818 });
1819 self.reset_cid_retirement(now);
1821 }
1822 }
1823 }
1824
1825 fn update_network_path_or_discard(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
1830 let remote_may_migrate = self.side.remote_may_migrate(&self.state);
1831 let local_ip_may_migrate = self.side.is_client();
1832 if let Some(known_path) = self.path_mut(path_id) {
1836 if network_path.remote != known_path.network_path.remote && !remote_may_migrate {
1837 trace!(
1838 %path_id,
1839 %network_path,
1840 %known_path.network_path,
1841 "discarding packet from unrecognized peer"
1842 );
1843 return true;
1844 }
1845
1846 if known_path.network_path.local_ip.is_some()
1847 && network_path.local_ip.is_some()
1848 && known_path.network_path.local_ip != network_path.local_ip
1849 && !local_ip_may_migrate
1850 {
1851 trace!(
1852 %path_id,
1853 %network_path,
1854 %known_path.network_path,
1855 "discarding packet sent to incorrect interface"
1856 );
1857 return true;
1858 }
1859 if let Some(local_ip) = network_path.local_ip {
1864 if known_path
1865 .network_path
1866 .local_ip
1867 .is_some_and(|ip| ip != local_ip)
1868 {
1869 debug!(
1870 %path_id,
1871 %network_path,
1872 %known_path.network_path,
1873 "path's local address seemingly migrated"
1874 );
1875 }
1876 known_path.network_path.local_ip = Some(local_ip);
1883 }
1884 }
1885 false
1886 }
1887
1888 pub fn handle_timeout(&mut self, now: Instant) {
1898 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1899 trace!(?timer, at=?now, "timeout");
1901 match timer {
1902 Timer::Conn(timer) => match timer {
1903 ConnTimer::Close => {
1904 self.state.move_to_drained(None);
1905 self.endpoint_events.push_back(EndpointEventInner::Drained);
1906 }
1907 ConnTimer::Idle => {
1908 self.kill(ConnectionError::TimedOut);
1909 }
1910 ConnTimer::KeepAlive => {
1911 trace!("sending keep-alive");
1912 self.ping();
1913 }
1914 ConnTimer::KeyDiscard => {
1915 self.zero_rtt_crypto = None;
1916 self.prev_crypto = None;
1917 }
1918 ConnTimer::PushNewCid => {
1919 while let Some((path_id, when)) = self.next_cid_retirement() {
1920 if when > now {
1921 break;
1922 }
1923 match self.local_cid_state.get_mut(&path_id) {
1924 None => error!(%path_id, "No local CID state for path"),
1925 Some(cid_state) => {
1926 let num_new_cid = cid_state.on_cid_timeout().into();
1928 if !self.state.is_closed() {
1929 trace!(
1930 "push a new CID to peer RETIRE_PRIOR_TO field {}",
1931 cid_state.retire_prior_to()
1932 );
1933 self.endpoint_events.push_back(
1934 EndpointEventInner::NeedIdentifiers(
1935 path_id,
1936 now,
1937 num_new_cid,
1938 ),
1939 );
1940 }
1941 }
1942 }
1943 }
1944 }
1945 },
1946 Timer::PerPath(path_id, timer) => {
1948 let span = trace_span!("per-path timer fired", %path_id, ?timer);
1949 let _guard = span.enter();
1950 match timer {
1951 PathTimer::PathIdle => {
1952 self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1953 .ok();
1954 }
1955
1956 PathTimer::PathKeepAlive => {
1957 trace!("sending keep-alive on path");
1958 self.ping_path(path_id).ok();
1959 }
1960 PathTimer::LossDetection => {
1961 self.on_loss_detection_timeout(now, path_id);
1962 self.qlog.emit_recovery_metrics(
1963 path_id,
1964 &mut self.paths.get_mut(&path_id).unwrap().data,
1965 now,
1966 );
1967 }
1968 PathTimer::PathValidation => {
1969 let Some(path) = self.paths.get_mut(&path_id) else {
1970 continue;
1971 };
1972 self.timers.stop(
1973 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1974 self.qlog.with_time(now),
1975 );
1976 debug!("path validation failed");
1977 if let Some((_, prev)) = path.prev.take() {
1978 path.data = prev;
1979 }
1980 path.data.challenges_sent.clear();
1981 path.data.send_new_challenge = false;
1982 }
1983 PathTimer::PathChallengeLost => {
1984 let Some(path) = self.paths.get_mut(&path_id) else {
1985 continue;
1986 };
1987 trace!("path challenge deemed lost");
1988 path.data.send_new_challenge = true;
1989 }
1990 PathTimer::PathOpen => {
1991 let Some(path) = self.paths.get_mut(&path_id) else {
1992 continue;
1993 };
1994 path.data.challenges_sent.clear();
1995 path.data.send_new_challenge = false;
1996 self.timers.stop(
1997 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1998 self.qlog.with_time(now),
1999 );
2000 debug!("new path validation failed");
2001 if let Err(err) = self.close_path(
2002 now,
2003 path_id,
2004 TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2005 ) {
2006 warn!(?err, "failed closing path");
2007 }
2008
2009 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2010 id: path_id,
2011 error: PathError::ValidationFailed,
2012 }));
2013 }
2014 PathTimer::Pacing => trace!("pacing timer expired"),
2015 PathTimer::MaxAckDelay => {
2016 trace!("max ack delay reached");
2017 self.spaces[SpaceId::Data]
2019 .for_path(path_id)
2020 .pending_acks
2021 .on_max_ack_delay_timeout()
2022 }
2023 PathTimer::DiscardPath => {
2024 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2027 if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2028 let (min_seq, max_seq) = loc_cid_state.active_seq();
2029 for seq in min_seq..=max_seq {
2030 self.endpoint_events.push_back(
2031 EndpointEventInner::RetireConnectionId(
2032 now, path_id, seq, false,
2033 ),
2034 );
2035 }
2036 }
2037 self.discard_path(path_id, now);
2038 }
2039 }
2040 }
2041 }
2042 }
2043 }
2044
2045 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2057 self.close_inner(
2058 now,
2059 Close::Application(frame::ApplicationClose { error_code, reason }),
2060 )
2061 }
2062
2063 fn close_inner(&mut self, now: Instant, reason: Close) {
2064 let was_closed = self.state.is_closed();
2065 if !was_closed {
2066 self.close_common();
2067 self.set_close_timer(now);
2068 self.close = true;
2069 self.state.move_to_closed_local(reason);
2070 }
2071 }
2072
2073 pub fn datagrams(&mut self) -> Datagrams<'_> {
2075 Datagrams { conn: self }
2076 }
2077
2078 pub fn stats(&mut self) -> ConnectionStats {
2080 self.stats.clone()
2081 }
2082
2083 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2085 let path = self.paths.get(&path_id)?;
2086 let stats = self.path_stats.entry(path_id).or_default();
2087 stats.rtt = path.data.rtt.get();
2088 stats.cwnd = path.data.congestion.window();
2089 stats.current_mtu = path.data.mtud.current_mtu();
2090 Some(*stats)
2091 }
2092
2093 pub fn ping(&mut self) {
2097 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2100 path_data.ping_pending = true;
2101 }
2102 }
2103
2104 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2108 let path_data = self.spaces[self.highest_space]
2109 .number_spaces
2110 .get_mut(&path)
2111 .ok_or(ClosedPath { _private: () })?;
2112 path_data.ping_pending = true;
2113 Ok(())
2114 }
2115
2116 pub fn force_key_update(&mut self) {
2120 if !self.state.is_established() {
2121 debug!("ignoring forced key update in illegal state");
2122 return;
2123 }
2124 if self.prev_crypto.is_some() {
2125 debug!("ignoring redundant forced key update");
2128 return;
2129 }
2130 self.update_keys(None, false);
2131 }
2132
2133 #[doc(hidden)]
2135 #[deprecated]
2136 pub fn initiate_key_update(&mut self) {
2137 self.force_key_update();
2138 }
2139
2140 pub fn crypto_session(&self) -> &dyn crypto::Session {
2142 &*self.crypto
2143 }
2144
2145 pub fn is_handshaking(&self) -> bool {
2150 self.state.is_handshake()
2151 }
2152
2153 pub fn is_closed(&self) -> bool {
2161 self.state.is_closed()
2162 }
2163
2164 pub fn is_drained(&self) -> bool {
2169 self.state.is_drained()
2170 }
2171
2172 pub fn accepted_0rtt(&self) -> bool {
2176 self.accepted_0rtt
2177 }
2178
2179 pub fn has_0rtt(&self) -> bool {
2181 self.zero_rtt_enabled
2182 }
2183
2184 pub fn has_pending_retransmits(&self) -> bool {
2186 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2187 }
2188
2189 pub fn side(&self) -> Side {
2191 self.side.side()
2192 }
2193
2194 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2196 self.path(path_id)
2197 .map(|path_data| {
2198 path_data
2199 .last_observed_addr_report
2200 .as_ref()
2201 .map(|observed| observed.socket_addr())
2202 })
2203 .ok_or(ClosedPath { _private: () })
2204 }
2205
2206 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2208 self.path(path_id).map(|d| d.rtt.get())
2209 }
2210
2211 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2213 self.path(path_id).map(|d| d.congestion.as_ref())
2214 }
2215
2216 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2221 self.streams.set_max_concurrent(dir, count);
2222 let pending = &mut self.spaces[SpaceId::Data].pending;
2225 self.streams.queue_max_stream_id(pending);
2226 }
2227
2228 pub fn set_max_concurrent_paths(
2238 &mut self,
2239 now: Instant,
2240 count: NonZeroU32,
2241 ) -> Result<(), MultipathNotNegotiated> {
2242 if !self.is_multipath_negotiated() {
2243 return Err(MultipathNotNegotiated { _private: () });
2244 }
2245 self.max_concurrent_paths = count;
2246
2247 let in_use_count = self
2248 .local_max_path_id
2249 .next()
2250 .saturating_sub(self.abandoned_paths.len() as u32)
2251 .as_u32();
2252 let extra_needed = count.get().saturating_sub(in_use_count);
2253 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2254
2255 self.set_max_path_id(now, new_max_path_id);
2256
2257 Ok(())
2258 }
2259
2260 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2262 if max_path_id <= self.local_max_path_id {
2263 return;
2264 }
2265
2266 self.local_max_path_id = max_path_id;
2267 self.spaces[SpaceId::Data].pending.max_path_id = true;
2268
2269 self.issue_first_path_cids(now);
2270 }
2271
2272 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2278 self.streams.max_concurrent(dir)
2279 }
2280
2281 pub fn set_send_window(&mut self, send_window: u64) {
2283 self.streams.set_send_window(send_window);
2284 }
2285
2286 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2288 if self.streams.set_receive_window(receive_window) {
2289 self.spaces[SpaceId::Data].pending.max_data = true;
2290 }
2291 }
2292
2293 pub fn is_multipath_negotiated(&self) -> bool {
2298 !self.is_handshaking()
2299 && self.config.max_concurrent_multipath_paths.is_some()
2300 && self.peer_params.initial_max_path_id.is_some()
2301 }
2302
2303 fn on_ack_received(
2304 &mut self,
2305 now: Instant,
2306 space: SpaceId,
2307 ack: frame::Ack,
2308 ) -> Result<(), TransportError> {
2309 let path = PathId::ZERO;
2311 self.inner_on_ack_received(now, space, path, ack)
2312 }
2313
2314 fn on_path_ack_received(
2315 &mut self,
2316 now: Instant,
2317 space: SpaceId,
2318 path_ack: frame::PathAck,
2319 ) -> Result<(), TransportError> {
2320 let (ack, path) = path_ack.into_ack();
2321 self.inner_on_ack_received(now, space, path, ack)
2322 }
2323
2324 fn inner_on_ack_received(
2326 &mut self,
2327 now: Instant,
2328 space: SpaceId,
2329 path: PathId,
2330 ack: frame::Ack,
2331 ) -> Result<(), TransportError> {
2332 if self.abandoned_paths.contains(&path) {
2333 trace!("silently ignoring PATH_ACK on abandoned path");
2336 return Ok(());
2337 }
2338 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2339 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2340 }
2341 let new_largest = {
2342 let space = &mut self.spaces[space].for_path(path);
2343 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2344 space.largest_acked_packet = Some(ack.largest);
2345 if let Some(info) = space.sent_packets.get(ack.largest) {
2346 space.largest_acked_packet_sent = info.time_sent;
2350 }
2351 true
2352 } else {
2353 false
2354 }
2355 };
2356
2357 if self.detect_spurious_loss(&ack, space, path) {
2358 self.path_data_mut(path)
2359 .congestion
2360 .on_spurious_congestion_event();
2361 }
2362
2363 let mut newly_acked = ArrayRangeSet::new();
2365 for range in ack.iter() {
2366 self.spaces[space].for_path(path).check_ack(range.clone())?;
2367 for (pn, _) in self.spaces[space]
2368 .for_path(path)
2369 .sent_packets
2370 .iter_range(range)
2371 {
2372 newly_acked.insert_one(pn);
2373 }
2374 }
2375
2376 if newly_acked.is_empty() {
2377 return Ok(());
2378 }
2379
2380 let mut ack_eliciting_acked = false;
2381 for packet in newly_acked.elts() {
2382 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2383 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2384 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2390 pns.pending_acks.subtract_below(*acked_pn);
2391 }
2392 }
2393 ack_eliciting_acked |= info.ack_eliciting;
2394
2395 let path_data = self.path_data_mut(path);
2397 let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2398 if mtu_updated {
2399 path_data
2400 .congestion
2401 .on_mtu_update(path_data.mtud.current_mtu());
2402 }
2403
2404 self.ack_frequency.on_acked(path, packet);
2406
2407 self.on_packet_acked(now, path, info);
2408 }
2409 }
2410
2411 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2412 let app_limited = self.app_limited;
2413 let path_data = self.path_data_mut(path);
2414 let in_flight = path_data.in_flight.bytes;
2415
2416 path_data
2417 .congestion
2418 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2419
2420 if new_largest && ack_eliciting_acked {
2421 let ack_delay = if space != SpaceId::Data {
2422 Duration::from_micros(0)
2423 } else {
2424 cmp::min(
2425 self.ack_frequency.peer_max_ack_delay,
2426 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2427 )
2428 };
2429 let rtt = now.saturating_duration_since(
2430 self.spaces[space].for_path(path).largest_acked_packet_sent,
2431 );
2432
2433 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2434 let path_data = self.path_data_mut(path);
2435 path_data.rtt.update(ack_delay, rtt);
2437 if path_data.first_packet_after_rtt_sample.is_none() {
2438 path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2439 }
2440 }
2441
2442 self.detect_lost_packets(now, space, path, true);
2444
2445 if self.peer_completed_address_validation(path) {
2446 self.path_data_mut(path).pto_count = 0;
2447 }
2448
2449 if self.path_data(path).sending_ecn {
2454 if let Some(ecn) = ack.ecn {
2455 if new_largest {
2460 let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2461 self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2462 }
2463 } else {
2464 debug!("ECN not acknowledged by peer");
2466 self.path_data_mut(path).sending_ecn = false;
2467 }
2468 }
2469
2470 self.set_loss_detection_timer(now, path);
2471 Ok(())
2472 }
2473
2474 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2475 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2476
2477 if lost_packets.is_empty() {
2478 return false;
2479 }
2480
2481 for range in ack.iter() {
2482 let spurious_losses: Vec<u64> = lost_packets
2483 .iter_range(range.clone())
2484 .map(|(pn, _info)| pn)
2485 .collect();
2486
2487 for pn in spurious_losses {
2488 lost_packets.remove(pn);
2489 }
2490 }
2491
2492 lost_packets.is_empty()
2497 }
2498
2499 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2504 let two_pto = 2 * self.path_data(path).rtt.pto_base();
2505
2506 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2507 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2508 }
2509
2510 fn process_ecn(
2512 &mut self,
2513 now: Instant,
2514 space: SpaceId,
2515 path: PathId,
2516 newly_acked: u64,
2517 ecn: frame::EcnCounts,
2518 largest_sent_time: Instant,
2519 ) {
2520 match self.spaces[space]
2521 .for_path(path)
2522 .detect_ecn(newly_acked, ecn)
2523 {
2524 Err(e) => {
2525 debug!("halting ECN due to verification failure: {}", e);
2526
2527 self.path_data_mut(path).sending_ecn = false;
2528 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2531 }
2532 Ok(false) => {}
2533 Ok(true) => {
2534 self.path_stats.entry(path).or_default().congestion_events += 1;
2535 self.path_data_mut(path).congestion.on_congestion_event(
2536 now,
2537 largest_sent_time,
2538 false,
2539 true,
2540 0,
2541 );
2542 }
2543 }
2544 }
2545
2546 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2549 self.paths
2550 .get_mut(&path_id)
2551 .expect("known path")
2552 .remove_in_flight(&info);
2553 let app_limited = self.app_limited;
2554 let path = self.path_data_mut(path_id);
2555 if info.ack_eliciting && !path.is_validating_path() {
2556 let rtt = path.rtt;
2559 path.congestion
2560 .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2561 }
2562
2563 if let Some(retransmits) = info.retransmits.get() {
2565 for (id, _) in retransmits.reset_stream.iter() {
2566 self.streams.reset_acked(*id);
2567 }
2568 }
2569
2570 for frame in info.stream_frames {
2571 self.streams.received_ack_of(frame);
2572 }
2573 }
2574
2575 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2576 let start = if self.zero_rtt_crypto.is_some() {
2577 now
2578 } else {
2579 self.prev_crypto
2580 .as_ref()
2581 .expect("no previous keys")
2582 .end_packet
2583 .as_ref()
2584 .expect("update not acknowledged yet")
2585 .1
2586 };
2587
2588 self.timers.set(
2590 Timer::Conn(ConnTimer::KeyDiscard),
2591 start + self.pto_max_path(space, false) * 3,
2592 self.qlog.with_time(now),
2593 );
2594 }
2595
2596 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2609 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2610 self.detect_lost_packets(now, pn_space, path_id, false);
2612 self.set_loss_detection_timer(now, path_id);
2613 return;
2614 }
2615
2616 let (_, space) = match self.pto_time_and_space(now, path_id) {
2617 Some(x) => x,
2618 None => {
2619 error!(%path_id, "PTO expired while unset");
2620 return;
2621 }
2622 };
2623 trace!(
2624 in_flight = self.path_data(path_id).in_flight.bytes,
2625 count = self.path_data(path_id).pto_count,
2626 ?space,
2627 %path_id,
2628 "PTO fired"
2629 );
2630
2631 let count = match self.path_data(path_id).in_flight.ack_eliciting {
2632 0 => {
2635 debug_assert!(!self.peer_completed_address_validation(path_id));
2636 1
2637 }
2638 _ => 2,
2640 };
2641 let pns = self.spaces[space].for_path(path_id);
2642 pns.loss_probes = pns.loss_probes.saturating_add(count);
2643 let path_data = self.path_data_mut(path_id);
2644 path_data.pto_count = path_data.pto_count.saturating_add(1);
2645 self.set_loss_detection_timer(now, path_id);
2646 }
2647
2648 fn detect_lost_packets(
2665 &mut self,
2666 now: Instant,
2667 pn_space: SpaceId,
2668 path_id: PathId,
2669 due_to_ack: bool,
2670 ) {
2671 let mut lost_packets = Vec::<u64>::new();
2672 let mut lost_mtu_probe = None;
2673 let mut in_persistent_congestion = false;
2674 let mut size_of_lost_packets = 0u64;
2675 self.spaces[pn_space].for_path(path_id).loss_time = None;
2676
2677 let path = self.path_data(path_id);
2680 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2681 let loss_delay = path
2682 .rtt
2683 .conservative()
2684 .mul_f32(self.config.time_threshold)
2685 .max(TIMER_GRANULARITY);
2686 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2687
2688 let largest_acked_packet = self.spaces[pn_space]
2689 .for_path(path_id)
2690 .largest_acked_packet
2691 .expect("detect_lost_packets only to be called if path received at least one ACK");
2692 let packet_threshold = self.config.packet_threshold as u64;
2693
2694 let congestion_period = self
2698 .pto(SpaceId::Data, path_id)
2699 .saturating_mul(self.config.persistent_congestion_threshold);
2700 let mut persistent_congestion_start: Option<Instant> = None;
2701 let mut prev_packet = None;
2702 let space = self.spaces[pn_space].for_path(path_id);
2703
2704 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2705 if prev_packet != Some(packet.wrapping_sub(1)) {
2706 persistent_congestion_start = None;
2708 }
2709
2710 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2714 if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2715 if Some(packet) == in_flight_mtu_probe {
2717 lost_mtu_probe = in_flight_mtu_probe;
2720 } else {
2721 lost_packets.push(packet);
2722 size_of_lost_packets += info.size as u64;
2723 if info.ack_eliciting && due_to_ack {
2724 match persistent_congestion_start {
2725 Some(start) if info.time_sent - start > congestion_period => {
2728 in_persistent_congestion = true;
2729 }
2730 None if first_packet_after_rtt_sample
2732 .is_some_and(|x| x < (pn_space, packet)) =>
2733 {
2734 persistent_congestion_start = Some(info.time_sent);
2735 }
2736 _ => {}
2737 }
2738 }
2739 }
2740 } else {
2741 if space.loss_time.is_none() {
2743 space.loss_time = Some(info.time_sent + loss_delay);
2746 }
2747 persistent_congestion_start = None;
2748 }
2749
2750 prev_packet = Some(packet);
2751 }
2752
2753 self.handle_lost_packets(
2754 pn_space,
2755 path_id,
2756 now,
2757 lost_packets,
2758 lost_mtu_probe,
2759 loss_delay,
2760 in_persistent_congestion,
2761 size_of_lost_packets,
2762 );
2763 }
2764
2765 fn discard_path(&mut self, path_id: PathId, now: Instant) {
2767 trace!(%path_id, "dropping path state");
2768 let path = self.path_data(path_id);
2769 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2770
2771 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2773 .for_path(path_id)
2774 .sent_packets
2775 .iter()
2776 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2777 .map(|(pn, info)| {
2778 size_of_lost_packets += info.size as u64;
2779 pn
2780 })
2781 .collect();
2782
2783 if !lost_pns.is_empty() {
2784 trace!(
2785 %path_id,
2786 count = lost_pns.len(),
2787 lost_bytes = size_of_lost_packets,
2788 "packets lost on path abandon"
2789 );
2790 self.handle_lost_packets(
2791 SpaceId::Data,
2792 path_id,
2793 now,
2794 lost_pns,
2795 in_flight_mtu_probe,
2796 Duration::ZERO,
2797 false,
2798 size_of_lost_packets,
2799 );
2800 }
2801 self.paths.remove(&path_id);
2802 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2803
2804 let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2805 self.events.push_back(
2806 PathEvent::Abandoned {
2807 id: path_id,
2808 path_stats,
2809 }
2810 .into(),
2811 );
2812 }
2813
2814 fn handle_lost_packets(
2815 &mut self,
2816 pn_space: SpaceId,
2817 path_id: PathId,
2818 now: Instant,
2819 lost_packets: Vec<u64>,
2820 lost_mtu_probe: Option<u64>,
2821 loss_delay: Duration,
2822 in_persistent_congestion: bool,
2823 size_of_lost_packets: u64,
2824 ) {
2825 debug_assert!(
2826 {
2827 let mut sorted = lost_packets.clone();
2828 sorted.sort();
2829 sorted == lost_packets
2830 },
2831 "lost_packets must be sorted"
2832 );
2833
2834 self.drain_lost_packets(now, pn_space, path_id);
2835
2836 if let Some(largest_lost) = lost_packets.last().cloned() {
2838 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2839 let largest_lost_sent = self.spaces[pn_space]
2840 .for_path(path_id)
2841 .sent_packets
2842 .get(largest_lost)
2843 .unwrap()
2844 .time_sent;
2845 let path_stats = self.path_stats.entry(path_id).or_default();
2846 path_stats.lost_packets += lost_packets.len() as u64;
2847 path_stats.lost_bytes += size_of_lost_packets;
2848 trace!(
2849 %path_id,
2850 count = lost_packets.len(),
2851 lost_bytes = size_of_lost_packets,
2852 "packets lost",
2853 );
2854
2855 for &packet in &lost_packets {
2856 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2857 continue;
2858 };
2859 self.qlog
2860 .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2861 self.paths
2862 .get_mut(&path_id)
2863 .unwrap()
2864 .remove_in_flight(&info);
2865
2866 for frame in info.stream_frames {
2867 self.streams.retransmit(frame);
2868 }
2869 self.spaces[pn_space].pending |= info.retransmits;
2870 self.path_data_mut(path_id)
2871 .mtud
2872 .on_non_probe_lost(packet, info.size);
2873
2874 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2875 packet,
2876 LostPacket {
2877 time_sent: info.time_sent,
2878 },
2879 );
2880 }
2881
2882 let path = self.path_data_mut(path_id);
2883 if path.mtud.black_hole_detected(now) {
2884 path.congestion.on_mtu_update(path.mtud.current_mtu());
2885 if let Some(max_datagram_size) = self.datagrams().max_size() {
2886 self.datagrams.drop_oversized(max_datagram_size);
2887 }
2888 self.path_stats
2889 .entry(path_id)
2890 .or_default()
2891 .black_holes_detected += 1;
2892 }
2893
2894 let lost_ack_eliciting =
2896 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2897
2898 if lost_ack_eliciting {
2899 self.path_stats
2900 .entry(path_id)
2901 .or_default()
2902 .congestion_events += 1;
2903 self.path_data_mut(path_id).congestion.on_congestion_event(
2904 now,
2905 largest_lost_sent,
2906 in_persistent_congestion,
2907 false,
2908 size_of_lost_packets,
2909 );
2910 }
2911 }
2912
2913 if let Some(packet) = lost_mtu_probe {
2915 let info = self.spaces[SpaceId::Data]
2916 .for_path(path_id)
2917 .take(packet)
2918 .unwrap(); self.paths
2921 .get_mut(&path_id)
2922 .unwrap()
2923 .remove_in_flight(&info);
2924 self.path_data_mut(path_id).mtud.on_probe_lost();
2925 self.path_stats
2926 .entry(path_id)
2927 .or_default()
2928 .lost_plpmtud_probes += 1;
2929 }
2930 }
2931
2932 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2938 SpaceId::iter()
2939 .filter_map(|id| {
2940 self.spaces[id]
2941 .number_spaces
2942 .get(&path_id)
2943 .and_then(|pns| pns.loss_time)
2944 .map(|time| (time, id))
2945 })
2946 .min_by_key(|&(time, _)| time)
2947 }
2948
2949 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2951 let path = self.path(path_id)?;
2952 let pto_count = path.pto_count;
2953 let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2954 let mut duration = path.rtt.pto_base() * backoff;
2955
2956 if path_id == PathId::ZERO
2957 && path.in_flight.ack_eliciting == 0
2958 && !self.peer_completed_address_validation(PathId::ZERO)
2959 {
2960 let space = match self.highest_space {
2966 SpaceId::Handshake => SpaceId::Handshake,
2967 _ => SpaceId::Initial,
2968 };
2969
2970 return Some((now + duration, space));
2971 }
2972
2973 let mut result = None;
2974 for space in SpaceId::iter() {
2975 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2976 continue;
2977 };
2978
2979 if !pns.has_in_flight() {
2980 continue;
2981 }
2982 if space == SpaceId::Data {
2983 if self.is_handshaking() {
2985 return result;
2986 }
2987 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2989 }
2990 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
2991 continue;
2992 };
2993 let pto = last_ack_eliciting + duration;
2994 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2995 if path.anti_amplification_blocked(1) {
2996 continue;
2998 }
2999 if path.in_flight.ack_eliciting == 0 {
3000 continue;
3002 }
3003 result = Some((pto, space));
3004 }
3005 }
3006 result
3007 }
3008
3009 fn peer_completed_address_validation(&self, path: PathId) -> bool {
3010 if self.side.is_server() || self.state.is_closed() {
3012 return true;
3013 }
3014 self.spaces[SpaceId::Handshake]
3017 .path_space(PathId::ZERO)
3018 .and_then(|pns| pns.largest_acked_packet)
3019 .is_some()
3020 || self.spaces[SpaceId::Data]
3021 .path_space(path)
3022 .and_then(|pns| pns.largest_acked_packet)
3023 .is_some()
3024 || (self.spaces[SpaceId::Data].crypto.is_some()
3025 && self.spaces[SpaceId::Handshake].crypto.is_none())
3026 }
3027
3028 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3036 if self.state.is_closed() {
3037 return;
3041 }
3042
3043 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3044 self.timers.set(
3046 Timer::PerPath(path_id, PathTimer::LossDetection),
3047 loss_time,
3048 self.qlog.with_time(now),
3049 );
3050 return;
3051 }
3052
3053 if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3056 self.timers.set(
3057 Timer::PerPath(path_id, PathTimer::LossDetection),
3058 timeout,
3059 self.qlog.with_time(now),
3060 );
3061 } else {
3062 self.timers.stop(
3063 Timer::PerPath(path_id, PathTimer::LossDetection),
3064 self.qlog.with_time(now),
3065 );
3066 }
3067 }
3068
3069 fn pto_max_path(&self, space: SpaceId, is_closing: bool) -> Duration {
3075 match space {
3076 SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3077 SpaceId::Data => self
3078 .paths
3079 .iter()
3080 .filter_map(|(path_id, state)| {
3081 if is_closing && state.data.total_sent == 0 && state.data.total_recvd == 0 {
3082 None
3084 } else {
3085 let pto = self.pto(space, *path_id);
3086 Some(pto)
3087 }
3088 })
3089 .max()
3090 .expect("there should be one at least path"),
3091 }
3092 }
3093
3094 fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3099 let max_ack_delay = match space {
3100 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3101 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3102 };
3103 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3104 }
3105
3106 fn on_packet_authenticated(
3107 &mut self,
3108 now: Instant,
3109 space_id: SpaceId,
3110 path_id: PathId,
3111 ecn: Option<EcnCodepoint>,
3112 packet: Option<u64>,
3113 spin: bool,
3114 is_1rtt: bool,
3115 ) {
3116 self.total_authed_packets += 1;
3117 if let Some(last_allowed_receive) = self
3118 .paths
3119 .get(&path_id)
3120 .and_then(|path| path.data.last_allowed_receive)
3121 {
3122 if now > last_allowed_receive {
3123 warn!("received data on path which we abandoned more than 3 * PTO ago");
3124 if !self.state.is_closed() {
3126 self.state.move_to_closed(TransportError::NO_ERROR(
3128 "peer failed to respond with PATH_ABANDON in time",
3129 ));
3130 self.close_common();
3131 self.set_close_timer(now);
3132 self.close = true;
3133 }
3134 return;
3135 }
3136 }
3137
3138 self.reset_keep_alive(path_id, now);
3139 self.reset_idle_timeout(now, space_id, path_id);
3140 self.permit_idle_reset = true;
3141 self.receiving_ecn |= ecn.is_some();
3142 if let Some(x) = ecn {
3143 let space = &mut self.spaces[space_id];
3144 space.for_path(path_id).ecn_counters += x;
3145
3146 if x.is_ce() {
3147 space
3148 .for_path(path_id)
3149 .pending_acks
3150 .set_immediate_ack_required();
3151 }
3152 }
3153
3154 let packet = match packet {
3155 Some(x) => x,
3156 None => return,
3157 };
3158 match &self.side {
3159 ConnectionSide::Client { .. } => {
3160 if space_id == SpaceId::Handshake {
3164 if let Some(hs) = self.state.as_handshake_mut() {
3165 hs.allow_server_migration = false;
3166 }
3167 }
3168 }
3169 ConnectionSide::Server { .. } => {
3170 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3171 {
3172 self.discard_space(now, SpaceId::Initial);
3174 }
3175 if self.zero_rtt_crypto.is_some() && is_1rtt {
3176 self.set_key_discard_timer(now, space_id)
3178 }
3179 }
3180 }
3181 let space = self.spaces[space_id].for_path(path_id);
3182 space.pending_acks.insert_one(packet, now);
3183 if packet >= space.rx_packet.unwrap_or_default() {
3184 space.rx_packet = Some(packet);
3185 self.spin = self.side.is_client() ^ spin;
3187 }
3188 }
3189
3190 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3195 if let Some(timeout) = self.idle_timeout {
3197 if self.state.is_closed() {
3198 self.timers
3199 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3200 } else {
3201 let dt = cmp::max(timeout, 3 * self.pto_max_path(space, false));
3202 self.timers.set(
3203 Timer::Conn(ConnTimer::Idle),
3204 now + dt,
3205 self.qlog.with_time(now),
3206 );
3207 }
3208 }
3209
3210 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3212 if self.state.is_closed() {
3213 self.timers.stop(
3214 Timer::PerPath(path_id, PathTimer::PathIdle),
3215 self.qlog.with_time(now),
3216 );
3217 } else {
3218 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3219 self.timers.set(
3220 Timer::PerPath(path_id, PathTimer::PathIdle),
3221 now + dt,
3222 self.qlog.with_time(now),
3223 );
3224 }
3225 }
3226 }
3227
3228 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3230 if !self.state.is_established() {
3231 return;
3232 }
3233
3234 if let Some(interval) = self.config.keep_alive_interval {
3235 self.timers.set(
3236 Timer::Conn(ConnTimer::KeepAlive),
3237 now + interval,
3238 self.qlog.with_time(now),
3239 );
3240 }
3241
3242 if let Some(interval) = self.path_data(path_id).keep_alive {
3243 self.timers.set(
3244 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3245 now + interval,
3246 self.qlog.with_time(now),
3247 );
3248 }
3249 }
3250
3251 fn reset_cid_retirement(&mut self, now: Instant) {
3253 if let Some((_path, t)) = self.next_cid_retirement() {
3254 self.timers.set(
3255 Timer::Conn(ConnTimer::PushNewCid),
3256 t,
3257 self.qlog.with_time(now),
3258 );
3259 }
3260 }
3261
3262 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3264 self.local_cid_state
3265 .iter()
3266 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3267 .min_by_key(|(_path_id, timeout)| *timeout)
3268 }
3269
3270 pub(crate) fn handle_first_packet(
3275 &mut self,
3276 now: Instant,
3277 network_path: FourTuple,
3278 ecn: Option<EcnCodepoint>,
3279 packet_number: u64,
3280 packet: InitialPacket,
3281 remaining: Option<BytesMut>,
3282 ) -> Result<(), ConnectionError> {
3283 let span = trace_span!("first recv");
3284 let _guard = span.enter();
3285 debug_assert!(self.side.is_server());
3286 let len = packet.header_data.len() + packet.payload.len();
3287 let path_id = PathId::ZERO;
3288 self.path_data_mut(path_id).total_recvd = len as u64;
3289
3290 if let Some(hs) = self.state.as_handshake_mut() {
3291 hs.expected_token = packet.header.token.clone();
3292 } else {
3293 unreachable!("first packet must be delivered in Handshake state");
3294 }
3295
3296 self.on_packet_authenticated(
3298 now,
3299 SpaceId::Initial,
3300 path_id,
3301 ecn,
3302 Some(packet_number),
3303 false,
3304 false,
3305 );
3306
3307 let packet: Packet = packet.into();
3308
3309 let mut qlog = QlogRecvPacket::new(len);
3310 qlog.header(&packet.header, Some(packet_number), path_id);
3311
3312 self.process_decrypted_packet(
3313 now,
3314 network_path,
3315 path_id,
3316 Some(packet_number),
3317 packet,
3318 &mut qlog,
3319 )?;
3320 self.qlog.emit_packet_received(qlog, now);
3321 if let Some(data) = remaining {
3322 self.handle_coalesced(now, network_path, path_id, ecn, data);
3323 }
3324
3325 self.qlog.emit_recovery_metrics(
3326 path_id,
3327 &mut self.paths.get_mut(&path_id).unwrap().data,
3328 now,
3329 );
3330
3331 Ok(())
3332 }
3333
3334 fn init_0rtt(&mut self, now: Instant) {
3335 let (header, packet) = match self.crypto.early_crypto() {
3336 Some(x) => x,
3337 None => return,
3338 };
3339 if self.side.is_client() {
3340 match self.crypto.transport_parameters() {
3341 Ok(params) => {
3342 let params = params
3343 .expect("crypto layer didn't supply transport parameters with ticket");
3344 let params = TransportParameters {
3346 initial_src_cid: None,
3347 original_dst_cid: None,
3348 preferred_address: None,
3349 retry_src_cid: None,
3350 stateless_reset_token: None,
3351 min_ack_delay: None,
3352 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3353 max_ack_delay: TransportParameters::default().max_ack_delay,
3354 initial_max_path_id: None,
3355 ..params
3356 };
3357 self.set_peer_params(params);
3358 self.qlog.emit_peer_transport_params_restored(self, now);
3359 }
3360 Err(e) => {
3361 error!("session ticket has malformed transport parameters: {}", e);
3362 return;
3363 }
3364 }
3365 }
3366 trace!("0-RTT enabled");
3367 self.zero_rtt_enabled = true;
3368 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3369 }
3370
3371 fn read_crypto(
3372 &mut self,
3373 space: SpaceId,
3374 crypto: &frame::Crypto,
3375 payload_len: usize,
3376 ) -> Result<(), TransportError> {
3377 let expected = if !self.state.is_handshake() {
3378 SpaceId::Data
3379 } else if self.highest_space == SpaceId::Initial {
3380 SpaceId::Initial
3381 } else {
3382 SpaceId::Handshake
3385 };
3386 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3390
3391 let end = crypto.offset + crypto.data.len() as u64;
3392 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3393 warn!(
3394 "received new {:?} CRYPTO data when expecting {:?}",
3395 space, expected
3396 );
3397 return Err(TransportError::PROTOCOL_VIOLATION(
3398 "new data at unexpected encryption level",
3399 ));
3400 }
3401
3402 let space = &mut self.spaces[space];
3403 let max = end.saturating_sub(space.crypto_stream.bytes_read());
3404 if max > self.config.crypto_buffer_size as u64 {
3405 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3406 }
3407
3408 space
3409 .crypto_stream
3410 .insert(crypto.offset, crypto.data.clone(), payload_len);
3411 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3412 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3413 if self.crypto.read_handshake(&chunk.bytes)? {
3414 self.events.push_back(Event::HandshakeDataReady);
3415 }
3416 }
3417
3418 Ok(())
3419 }
3420
3421 fn write_crypto(&mut self) {
3422 loop {
3423 let space = self.highest_space;
3424 let mut outgoing = Vec::new();
3425 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3426 match space {
3427 SpaceId::Initial => {
3428 self.upgrade_crypto(SpaceId::Handshake, crypto);
3429 }
3430 SpaceId::Handshake => {
3431 self.upgrade_crypto(SpaceId::Data, crypto);
3432 }
3433 _ => unreachable!("got updated secrets during 1-RTT"),
3434 }
3435 }
3436 if outgoing.is_empty() {
3437 if space == self.highest_space {
3438 break;
3439 } else {
3440 continue;
3442 }
3443 }
3444 let offset = self.spaces[space].crypto_offset;
3445 let outgoing = Bytes::from(outgoing);
3446 if let Some(hs) = self.state.as_handshake_mut() {
3447 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3448 hs.client_hello = Some(outgoing.clone());
3449 }
3450 }
3451 self.spaces[space].crypto_offset += outgoing.len() as u64;
3452 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3453 self.spaces[space].pending.crypto.push_back(frame::Crypto {
3454 offset,
3455 data: outgoing,
3456 });
3457 }
3458 }
3459
3460 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3462 debug_assert!(
3463 self.spaces[space].crypto.is_none(),
3464 "already reached packet space {space:?}"
3465 );
3466 trace!("{:?} keys ready", space);
3467 if space == SpaceId::Data {
3468 self.next_crypto = Some(
3470 self.crypto
3471 .next_1rtt_keys()
3472 .expect("handshake should be complete"),
3473 );
3474 }
3475
3476 self.spaces[space].crypto = Some(crypto);
3477 debug_assert!(space as usize > self.highest_space as usize);
3478 self.highest_space = space;
3479 if space == SpaceId::Data && self.side.is_client() {
3480 self.zero_rtt_crypto = None;
3482 }
3483 }
3484
3485 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3486 debug_assert!(space_id != SpaceId::Data);
3487 trace!("discarding {:?} keys", space_id);
3488 if space_id == SpaceId::Initial {
3489 if let ConnectionSide::Client { token, .. } = &mut self.side {
3491 *token = Bytes::new();
3492 }
3493 }
3494 let space = &mut self.spaces[space_id];
3495 space.crypto = None;
3496 let pns = space.for_path(PathId::ZERO);
3497 pns.time_of_last_ack_eliciting_packet = None;
3498 pns.loss_time = None;
3499 pns.loss_probes = 0;
3500 let sent_packets = mem::take(&mut pns.sent_packets);
3501 let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3502 for (_, packet) in sent_packets.into_iter() {
3503 path.data.remove_in_flight(&packet);
3504 }
3505
3506 self.set_loss_detection_timer(now, PathId::ZERO)
3507 }
3508
3509 fn handle_coalesced(
3510 &mut self,
3511 now: Instant,
3512 network_path: FourTuple,
3513 path_id: PathId,
3514 ecn: Option<EcnCodepoint>,
3515 data: BytesMut,
3516 ) {
3517 self.path_data_mut(path_id)
3518 .inc_total_recvd(data.len() as u64);
3519 let mut remaining = Some(data);
3520 let cid_len = self
3521 .local_cid_state
3522 .values()
3523 .map(|cid_state| cid_state.cid_len())
3524 .next()
3525 .expect("one cid_state must exist");
3526 while let Some(data) = remaining {
3527 match PartialDecode::new(
3528 data,
3529 &FixedLengthConnectionIdParser::new(cid_len),
3530 &[self.version],
3531 self.endpoint_config.grease_quic_bit,
3532 ) {
3533 Ok((partial_decode, rest)) => {
3534 remaining = rest;
3535 self.handle_decode(now, network_path, path_id, ecn, partial_decode);
3536 }
3537 Err(e) => {
3538 trace!("malformed header: {}", e);
3539 return;
3540 }
3541 }
3542 }
3543 }
3544
3545 fn handle_decode(
3546 &mut self,
3547 now: Instant,
3548 network_path: FourTuple,
3549 path_id: PathId,
3550 ecn: Option<EcnCodepoint>,
3551 partial_decode: PartialDecode,
3552 ) {
3553 let qlog = QlogRecvPacket::new(partial_decode.len());
3554 if let Some(decoded) = packet_crypto::unprotect_header(
3555 partial_decode,
3556 &self.spaces,
3557 self.zero_rtt_crypto.as_ref(),
3558 self.peer_params.stateless_reset_token,
3559 ) {
3560 self.handle_packet(
3561 now,
3562 network_path,
3563 path_id,
3564 ecn,
3565 decoded.packet,
3566 decoded.stateless_reset,
3567 qlog,
3568 );
3569 }
3570 }
3571
3572 fn handle_packet(
3573 &mut self,
3574 now: Instant,
3575 network_path: FourTuple,
3576 path_id: PathId,
3577 ecn: Option<EcnCodepoint>,
3578 packet: Option<Packet>,
3579 stateless_reset: bool,
3580 mut qlog: QlogRecvPacket,
3581 ) {
3582 self.stats.udp_rx.ios += 1;
3583 if let Some(ref packet) = packet {
3584 trace!(
3585 "got {:?} packet ({} bytes) from {} using id {}",
3586 packet.header.space(),
3587 packet.payload.len() + packet.header_data.len(),
3588 network_path,
3589 packet.header.dst_cid(),
3590 );
3591 }
3592
3593 if self.is_handshaking() {
3594 if path_id != PathId::ZERO {
3595 debug!(%network_path, %path_id, "discarding multipath packet during handshake");
3596 return;
3597 }
3598 if network_path != self.path_data_mut(path_id).network_path {
3599 if let Some(hs) = self.state.as_handshake() {
3600 if hs.allow_server_migration {
3601 trace!(%network_path, prev = %self.path_data(path_id).network_path, "server migrated to new remote");
3602 self.path_data_mut(path_id).network_path = network_path;
3603 self.qlog.emit_tuple_assigned(path_id, network_path, now);
3604 } else {
3605 debug!("discarding packet with unexpected remote during handshake");
3606 return;
3607 }
3608 } else {
3609 debug!("discarding packet with unexpected remote during handshake");
3610 return;
3611 }
3612 }
3613 }
3614
3615 let was_closed = self.state.is_closed();
3616 let was_drained = self.state.is_drained();
3617
3618 let decrypted = match packet {
3619 None => Err(None),
3620 Some(mut packet) => self
3621 .decrypt_packet(now, path_id, &mut packet)
3622 .map(move |number| (packet, number)),
3623 };
3624 let result = match decrypted {
3625 _ if stateless_reset => {
3626 debug!("got stateless reset");
3627 Err(ConnectionError::Reset)
3628 }
3629 Err(Some(e)) => {
3630 warn!("illegal packet: {}", e);
3631 Err(e.into())
3632 }
3633 Err(None) => {
3634 debug!("failed to authenticate packet");
3635 self.authentication_failures += 1;
3636 let integrity_limit = self.spaces[self.highest_space]
3637 .crypto
3638 .as_ref()
3639 .unwrap()
3640 .packet
3641 .local
3642 .integrity_limit();
3643 if self.authentication_failures > integrity_limit {
3644 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3645 } else {
3646 return;
3647 }
3648 }
3649 Ok((packet, number)) => {
3650 qlog.header(&packet.header, number, path_id);
3651 let span = match number {
3652 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3653 None => trace_span!("recv", space = ?packet.header.space()),
3654 };
3655 let _guard = span.enter();
3656
3657 let dedup = self.spaces[packet.header.space()]
3658 .path_space_mut(path_id)
3659 .map(|pns| &mut pns.dedup);
3660 if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3661 debug!("discarding possible duplicate packet");
3662 self.qlog.emit_packet_received(qlog, now);
3663 return;
3664 } else if self.state.is_handshake() && packet.header.is_short() {
3665 trace!("dropping short packet during handshake");
3667 self.qlog.emit_packet_received(qlog, now);
3668 return;
3669 } else {
3670 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3671 if let Some(hs) = self.state.as_handshake() {
3672 if self.side.is_server() && token != &hs.expected_token {
3673 warn!("discarding Initial with invalid retry token");
3677 self.qlog.emit_packet_received(qlog, now);
3678 return;
3679 }
3680 }
3681 }
3682
3683 if !self.state.is_closed() {
3684 let spin = match packet.header {
3685 Header::Short { spin, .. } => spin,
3686 _ => false,
3687 };
3688
3689 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3690 self.ensure_path(path_id, network_path, now, number);
3692 }
3693 if self.paths.contains_key(&path_id) {
3694 self.on_packet_authenticated(
3695 now,
3696 packet.header.space(),
3697 path_id,
3698 ecn,
3699 number,
3700 spin,
3701 packet.header.is_1rtt(),
3702 );
3703 }
3704 }
3705
3706 let res = self.process_decrypted_packet(
3707 now,
3708 network_path,
3709 path_id,
3710 number,
3711 packet,
3712 &mut qlog,
3713 );
3714
3715 self.qlog.emit_packet_received(qlog, now);
3716 res
3717 }
3718 }
3719 };
3720
3721 if let Err(conn_err) = result {
3723 match conn_err {
3724 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3725 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3726 ConnectionError::Reset
3727 | ConnectionError::TransportError(TransportError {
3728 code: TransportErrorCode::AEAD_LIMIT_REACHED,
3729 ..
3730 }) => {
3731 self.state.move_to_drained(Some(conn_err));
3732 }
3733 ConnectionError::TimedOut => {
3734 unreachable!("timeouts aren't generated by packet processing");
3735 }
3736 ConnectionError::TransportError(err) => {
3737 debug!("closing connection due to transport error: {}", err);
3738 self.state.move_to_closed(err);
3739 }
3740 ConnectionError::VersionMismatch => {
3741 self.state.move_to_draining(Some(conn_err));
3742 }
3743 ConnectionError::LocallyClosed => {
3744 unreachable!("LocallyClosed isn't generated by packet processing");
3745 }
3746 ConnectionError::CidsExhausted => {
3747 unreachable!("CidsExhausted isn't generated by packet processing");
3748 }
3749 };
3750 }
3751
3752 if !was_closed && self.state.is_closed() {
3753 self.close_common();
3754 if !self.state.is_drained() {
3755 self.set_close_timer(now);
3756 }
3757 }
3758 if !was_drained && self.state.is_drained() {
3759 self.endpoint_events.push_back(EndpointEventInner::Drained);
3760 self.timers
3763 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3764 }
3765
3766 if matches!(self.state.as_type(), StateType::Closed) {
3768 let path_remote = self
3772 .paths
3773 .get(&path_id)
3774 .map(|p| p.data.network_path)
3775 .unwrap_or(network_path);
3776 self.close = network_path == path_remote;
3777 }
3778 }
3779
3780 fn process_decrypted_packet(
3781 &mut self,
3782 now: Instant,
3783 network_path: FourTuple,
3784 path_id: PathId,
3785 number: Option<u64>,
3786 packet: Packet,
3787 qlog: &mut QlogRecvPacket,
3788 ) -> Result<(), ConnectionError> {
3789 if !self.paths.contains_key(&path_id) {
3790 trace!(%path_id, ?number, "discarding packet for unknown path");
3794 return Ok(());
3795 }
3796 let state = match self.state.as_type() {
3797 StateType::Established => {
3798 match packet.header.space() {
3799 SpaceId::Data => self.process_payload(
3800 now,
3801 network_path,
3802 path_id,
3803 number.unwrap(),
3804 packet,
3805 qlog,
3806 )?,
3807 _ if packet.header.has_frames() => {
3808 self.process_early_payload(now, path_id, packet, qlog)?
3809 }
3810 _ => {
3811 trace!("discarding unexpected pre-handshake packet");
3812 }
3813 }
3814 return Ok(());
3815 }
3816 StateType::Closed => {
3817 for result in frame::Iter::new(packet.payload.freeze())? {
3818 let frame = match result {
3819 Ok(frame) => frame,
3820 Err(err) => {
3821 debug!("frame decoding error: {err:?}");
3822 continue;
3823 }
3824 };
3825 qlog.frame(&frame);
3826
3827 if let Frame::Padding = frame {
3828 continue;
3829 };
3830
3831 self.stats.frame_rx.record(frame.ty());
3832
3833 if let Frame::Close(_error) = frame {
3834 self.state.move_to_draining(None);
3835 break;
3836 }
3837 }
3838 return Ok(());
3839 }
3840 StateType::Draining | StateType::Drained => return Ok(()),
3841 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3842 };
3843
3844 match packet.header {
3845 Header::Retry {
3846 src_cid: rem_cid, ..
3847 } => {
3848 debug_assert_eq!(path_id, PathId::ZERO);
3849 if self.side.is_server() {
3850 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3851 }
3852
3853 let is_valid_retry = self
3854 .rem_cids
3855 .get(&path_id)
3856 .map(|cids| cids.active())
3857 .map(|orig_dst_cid| {
3858 self.crypto.is_valid_retry(
3859 orig_dst_cid,
3860 &packet.header_data,
3861 &packet.payload,
3862 )
3863 })
3864 .unwrap_or_default();
3865 if self.total_authed_packets > 1
3866 || packet.payload.len() <= 16 || !is_valid_retry
3868 {
3869 trace!("discarding invalid Retry");
3870 return Ok(());
3878 }
3879
3880 trace!("retrying with CID {}", rem_cid);
3881 let client_hello = state.client_hello.take().unwrap();
3882 self.retry_src_cid = Some(rem_cid);
3883 self.rem_cids
3884 .get_mut(&path_id)
3885 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3886 .update_initial_cid(rem_cid);
3887 self.rem_handshake_cid = rem_cid;
3888
3889 let space = &mut self.spaces[SpaceId::Initial];
3890 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3891 self.on_packet_acked(now, PathId::ZERO, info);
3892 };
3893
3894 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = {
3897 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3898 space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3899 space.crypto_offset = client_hello.len() as u64;
3900 space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3901 .for_path(path_id)
3902 .next_packet_number;
3903 space.pending.crypto.push_back(frame::Crypto {
3904 offset: 0,
3905 data: client_hello,
3906 });
3907 space
3908 };
3909
3910 let zero_rtt = mem::take(
3912 &mut self.spaces[SpaceId::Data]
3913 .for_path(PathId::ZERO)
3914 .sent_packets,
3915 );
3916 for (_, info) in zero_rtt.into_iter() {
3917 self.paths
3918 .get_mut(&PathId::ZERO)
3919 .unwrap()
3920 .remove_in_flight(&info);
3921 self.spaces[SpaceId::Data].pending |= info.retransmits;
3922 }
3923 self.streams.retransmit_all_for_0rtt();
3924
3925 let token_len = packet.payload.len() - 16;
3926 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3927 unreachable!("we already short-circuited if we're server");
3928 };
3929 *token = packet.payload.freeze().split_to(token_len);
3930
3931 self.state = State::handshake(state::Handshake {
3932 expected_token: Bytes::new(),
3933 rem_cid_set: false,
3934 client_hello: None,
3935 allow_server_migration: true,
3936 });
3937 Ok(())
3938 }
3939 Header::Long {
3940 ty: LongType::Handshake,
3941 src_cid: rem_cid,
3942 dst_cid: loc_cid,
3943 ..
3944 } => {
3945 debug_assert_eq!(path_id, PathId::ZERO);
3946 if rem_cid != self.rem_handshake_cid {
3947 debug!(
3948 "discarding packet with mismatched remote CID: {} != {}",
3949 self.rem_handshake_cid, rem_cid
3950 );
3951 return Ok(());
3952 }
3953 self.on_path_validated(path_id);
3954
3955 self.process_early_payload(now, path_id, packet, qlog)?;
3956 if self.state.is_closed() {
3957 return Ok(());
3958 }
3959
3960 if self.crypto.is_handshaking() {
3961 trace!("handshake ongoing");
3962 return Ok(());
3963 }
3964
3965 if self.side.is_client() {
3966 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3968 TransportError::new(
3969 TransportErrorCode::crypto(0x6d),
3970 "transport parameters missing".to_owned(),
3971 )
3972 })?;
3973
3974 if self.has_0rtt() {
3975 if !self.crypto.early_data_accepted().unwrap() {
3976 debug_assert!(self.side.is_client());
3977 debug!("0-RTT rejected");
3978 self.accepted_0rtt = false;
3979 self.streams.zero_rtt_rejected();
3980
3981 self.spaces[SpaceId::Data].pending = Retransmits::default();
3983
3984 let sent_packets = mem::take(
3986 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3987 );
3988 for (_, packet) in sent_packets.into_iter() {
3989 self.paths
3990 .get_mut(&path_id)
3991 .unwrap()
3992 .remove_in_flight(&packet);
3993 }
3994 } else {
3995 self.accepted_0rtt = true;
3996 params.validate_resumption_from(&self.peer_params)?;
3997 }
3998 }
3999 if let Some(token) = params.stateless_reset_token {
4000 let remote = self.path_data(path_id).network_path.remote;
4002 self.endpoint_events
4003 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4004 }
4005 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4006 self.issue_first_cids(now);
4007 } else {
4008 self.spaces[SpaceId::Data].pending.handshake_done = true;
4010 self.discard_space(now, SpaceId::Handshake);
4011 self.events.push_back(Event::HandshakeConfirmed);
4012 trace!("handshake confirmed");
4013 }
4014
4015 self.events.push_back(Event::Connected);
4016 self.state.move_to_established();
4017 trace!("established");
4018
4019 self.issue_first_path_cids(now);
4022 Ok(())
4023 }
4024 Header::Initial(InitialHeader {
4025 src_cid: rem_cid,
4026 dst_cid: loc_cid,
4027 ..
4028 }) => {
4029 debug_assert_eq!(path_id, PathId::ZERO);
4030 if !state.rem_cid_set {
4031 trace!("switching remote CID to {}", rem_cid);
4032 let mut state = state.clone();
4033 self.rem_cids
4034 .get_mut(&path_id)
4035 .expect("PathId::ZERO not yet abandoned")
4036 .update_initial_cid(rem_cid);
4037 self.rem_handshake_cid = rem_cid;
4038 self.orig_rem_cid = rem_cid;
4039 state.rem_cid_set = true;
4040 self.state.move_to_handshake(state);
4041 } else if rem_cid != self.rem_handshake_cid {
4042 debug!(
4043 "discarding packet with mismatched remote CID: {} != {}",
4044 self.rem_handshake_cid, rem_cid
4045 );
4046 return Ok(());
4047 }
4048
4049 let starting_space = self.highest_space;
4050 self.process_early_payload(now, path_id, packet, qlog)?;
4051
4052 if self.side.is_server()
4053 && starting_space == SpaceId::Initial
4054 && self.highest_space != SpaceId::Initial
4055 {
4056 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4057 TransportError::new(
4058 TransportErrorCode::crypto(0x6d),
4059 "transport parameters missing".to_owned(),
4060 )
4061 })?;
4062 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4063 self.issue_first_cids(now);
4064 self.init_0rtt(now);
4065 }
4066 Ok(())
4067 }
4068 Header::Long {
4069 ty: LongType::ZeroRtt,
4070 ..
4071 } => {
4072 self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4073 Ok(())
4074 }
4075 Header::VersionNegotiate { .. } => {
4076 if self.total_authed_packets > 1 {
4077 return Ok(());
4078 }
4079 let supported = packet
4080 .payload
4081 .chunks(4)
4082 .any(|x| match <[u8; 4]>::try_from(x) {
4083 Ok(version) => self.version == u32::from_be_bytes(version),
4084 Err(_) => false,
4085 });
4086 if supported {
4087 return Ok(());
4088 }
4089 debug!("remote doesn't support our version");
4090 Err(ConnectionError::VersionMismatch)
4091 }
4092 Header::Short { .. } => unreachable!(
4093 "short packets received during handshake are discarded in handle_packet"
4094 ),
4095 }
4096 }
4097
4098 fn process_early_payload(
4100 &mut self,
4101 now: Instant,
4102 path_id: PathId,
4103 packet: Packet,
4104 #[allow(unused)] qlog: &mut QlogRecvPacket,
4105 ) -> Result<(), TransportError> {
4106 debug_assert_ne!(packet.header.space(), SpaceId::Data);
4107 debug_assert_eq!(path_id, PathId::ZERO);
4108 let payload_len = packet.payload.len();
4109 let mut ack_eliciting = false;
4110 for result in frame::Iter::new(packet.payload.freeze())? {
4111 let frame = result?;
4112 qlog.frame(&frame);
4113 let span = match frame {
4114 Frame::Padding => continue,
4115 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4116 };
4117
4118 self.stats.frame_rx.record(frame.ty());
4119
4120 let _guard = span.as_ref().map(|x| x.enter());
4121 ack_eliciting |= frame.is_ack_eliciting();
4122
4123 if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4125 return Err(TransportError::PROTOCOL_VIOLATION(
4126 "illegal frame type in handshake",
4127 ));
4128 }
4129
4130 match frame {
4131 Frame::Padding | Frame::Ping => {}
4132 Frame::Crypto(frame) => {
4133 self.read_crypto(packet.header.space(), &frame, payload_len)?;
4134 }
4135 Frame::Ack(ack) => {
4136 self.on_ack_received(now, packet.header.space(), ack)?;
4137 }
4138 Frame::PathAck(ack) => {
4139 span.as_ref()
4140 .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4141 self.on_path_ack_received(now, packet.header.space(), ack)?;
4142 }
4143 Frame::Close(reason) => {
4144 self.state.move_to_draining(Some(reason.into()));
4145 return Ok(());
4146 }
4147 _ => {
4148 let mut err =
4149 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4150 err.frame = frame::MaybeFrame::Known(frame.ty());
4151 return Err(err);
4152 }
4153 }
4154 }
4155
4156 if ack_eliciting {
4157 self.spaces[packet.header.space()]
4159 .for_path(path_id)
4160 .pending_acks
4161 .set_immediate_ack_required();
4162 }
4163
4164 self.write_crypto();
4165 Ok(())
4166 }
4167
4168 fn process_payload(
4170 &mut self,
4171 now: Instant,
4172 network_path: FourTuple,
4173 path_id: PathId,
4174 number: u64,
4175 packet: Packet,
4176 #[allow(unused)] qlog: &mut QlogRecvPacket,
4177 ) -> Result<(), TransportError> {
4178 let payload = packet.payload.freeze();
4179 let mut is_probing_packet = true;
4180 let mut close = None;
4181 let payload_len = payload.len();
4182 let mut ack_eliciting = false;
4183 let mut migration_observed_addr = None;
4186 for result in frame::Iter::new(payload)? {
4187 let frame = result?;
4188 qlog.frame(&frame);
4189 let span = match frame {
4190 Frame::Padding => continue,
4191 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4192 };
4193
4194 self.stats.frame_rx.record(frame.ty());
4195 match &frame {
4198 Frame::Crypto(f) => {
4199 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4200 }
4201 Frame::Stream(f) => {
4202 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4203 }
4204 Frame::Datagram(f) => {
4205 trace!(len = f.data.len(), "got datagram frame");
4206 }
4207 f => {
4208 trace!("got frame {f}");
4209 }
4210 }
4211
4212 let _guard = span.enter();
4213 if packet.header.is_0rtt() {
4214 match frame {
4215 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4216 return Err(TransportError::PROTOCOL_VIOLATION(
4217 "illegal frame type in 0-RTT",
4218 ));
4219 }
4220 _ => {
4221 if frame.is_1rtt() {
4222 return Err(TransportError::PROTOCOL_VIOLATION(
4223 "illegal frame type in 0-RTT",
4224 ));
4225 }
4226 }
4227 }
4228 }
4229 ack_eliciting |= frame.is_ack_eliciting();
4230
4231 match frame {
4233 Frame::Padding
4234 | Frame::PathChallenge(_)
4235 | Frame::PathResponse(_)
4236 | Frame::NewConnectionId(_)
4237 | Frame::ObservedAddr(_) => {}
4238 _ => {
4239 is_probing_packet = false;
4240 }
4241 }
4242
4243 match frame {
4244 Frame::Crypto(frame) => {
4245 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4246 }
4247 Frame::Stream(frame) => {
4248 if self.streams.received(frame, payload_len)?.should_transmit() {
4249 self.spaces[SpaceId::Data].pending.max_data = true;
4250 }
4251 }
4252 Frame::Ack(ack) => {
4253 self.on_ack_received(now, SpaceId::Data, ack)?;
4254 }
4255 Frame::PathAck(ack) => {
4256 span.record("path", tracing::field::debug(&ack.path_id));
4257 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4258 }
4259 Frame::Padding | Frame::Ping => {}
4260 Frame::Close(reason) => {
4261 close = Some(reason);
4262 }
4263 Frame::PathChallenge(challenge) => {
4264 let path = &mut self
4265 .path_mut(path_id)
4266 .expect("payload is processed only after the path becomes known");
4267 path.path_responses.push(number, challenge.0, network_path);
4268 if network_path == path.network_path {
4271 match self.peer_supports_ack_frequency() {
4281 true => self.immediate_ack(path_id),
4282 false => {
4283 self.ping_path(path_id).ok();
4284 }
4285 }
4286 }
4287 }
4288 Frame::PathResponse(response) => {
4289 let path = self
4290 .paths
4291 .get_mut(&path_id)
4292 .expect("payload is processed only after the path becomes known");
4293
4294 use PathTimer::*;
4295 use paths::OnPathResponseReceived::*;
4296 match path
4297 .data
4298 .on_path_response_received(now, response.0, network_path)
4299 {
4300 OnPath { was_open } => {
4301 let qlog = self.qlog.with_time(now);
4302
4303 self.timers
4304 .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4305 self.timers
4306 .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4307
4308 let next_challenge = path
4309 .data
4310 .earliest_expiring_challenge()
4311 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4312 self.timers.set_or_stop(
4313 Timer::PerPath(path_id, PathChallengeLost),
4314 next_challenge,
4315 qlog,
4316 );
4317
4318 if !was_open {
4319 self.events
4320 .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4321 if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4322 {
4323 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4324 id: path_id,
4325 addr: observed.socket_addr(),
4326 }));
4327 }
4328 }
4329 if let Some((_, ref mut prev)) = path.prev {
4330 prev.challenges_sent.clear();
4331 prev.send_new_challenge = false;
4332 }
4333 }
4334 OffPath => {
4335 debug!("Response to off-path PathChallenge!");
4336 let next_challenge = path
4337 .data
4338 .earliest_expiring_challenge()
4339 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4340 self.timers.set_or_stop(
4341 Timer::PerPath(path_id, PathChallengeLost),
4342 next_challenge,
4343 self.qlog.with_time(now),
4344 );
4345 }
4346 Invalid { expected } => {
4347 debug!(%response, %network_path, %expected, "ignoring invalid PATH_RESPONSE")
4348 }
4349 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4350 }
4351 }
4352 Frame::MaxData(frame::MaxData(bytes)) => {
4353 self.streams.received_max_data(bytes);
4354 }
4355 Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
4356 self.streams.received_max_stream_data(id, offset)?;
4357 }
4358 Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
4359 self.streams.received_max_streams(dir, count)?;
4360 }
4361 Frame::ResetStream(frame) => {
4362 if self.streams.received_reset(frame)?.should_transmit() {
4363 self.spaces[SpaceId::Data].pending.max_data = true;
4364 }
4365 }
4366 Frame::DataBlocked { offset } => {
4367 debug!(offset, "peer claims to be blocked at connection level");
4368 }
4369 Frame::StreamDataBlocked { id, offset } => {
4370 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4371 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4372 return Err(TransportError::STREAM_STATE_ERROR(
4373 "STREAM_DATA_BLOCKED on send-only stream",
4374 ));
4375 }
4376 debug!(
4377 stream = %id,
4378 offset, "peer claims to be blocked at stream level"
4379 );
4380 }
4381 Frame::StreamsBlocked { dir, limit } => {
4382 if limit > MAX_STREAM_COUNT {
4383 return Err(TransportError::FRAME_ENCODING_ERROR(
4384 "unrepresentable stream limit",
4385 ));
4386 }
4387 debug!(
4388 "peer claims to be blocked opening more than {} {} streams",
4389 limit, dir
4390 );
4391 }
4392 Frame::StopSending(frame::StopSending { id, error_code }) => {
4393 if id.initiator() != self.side.side() {
4394 if id.dir() == Dir::Uni {
4395 debug!("got STOP_SENDING on recv-only {}", id);
4396 return Err(TransportError::STREAM_STATE_ERROR(
4397 "STOP_SENDING on recv-only stream",
4398 ));
4399 }
4400 } else if self.streams.is_local_unopened(id) {
4401 return Err(TransportError::STREAM_STATE_ERROR(
4402 "STOP_SENDING on unopened stream",
4403 ));
4404 }
4405 self.streams.received_stop_sending(id, error_code);
4406 }
4407 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4408 if let Some(ref path_id) = path_id {
4409 span.record("path", tracing::field::debug(&path_id));
4410 }
4411 let path_id = path_id.unwrap_or_default();
4412 match self.local_cid_state.get_mut(&path_id) {
4413 None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4414 Some(cid_state) => {
4415 let allow_more_cids = cid_state
4416 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4417
4418 let has_path = !self.abandoned_paths.contains(&path_id);
4422 let allow_more_cids = allow_more_cids && has_path;
4423
4424 self.endpoint_events
4425 .push_back(EndpointEventInner::RetireConnectionId(
4426 now,
4427 path_id,
4428 sequence,
4429 allow_more_cids,
4430 ));
4431 }
4432 }
4433 }
4434 Frame::NewConnectionId(frame) => {
4435 let path_id = if let Some(path_id) = frame.path_id {
4436 if !self.is_multipath_negotiated() {
4437 return Err(TransportError::PROTOCOL_VIOLATION(
4438 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4439 ));
4440 }
4441 if path_id > self.local_max_path_id {
4442 return Err(TransportError::PROTOCOL_VIOLATION(
4443 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4444 ));
4445 }
4446 path_id
4447 } else {
4448 PathId::ZERO
4449 };
4450
4451 if self.abandoned_paths.contains(&path_id) {
4452 trace!("ignoring issued CID for abandoned path");
4453 continue;
4454 }
4455 if let Some(ref path_id) = frame.path_id {
4456 span.record("path", tracing::field::debug(&path_id));
4457 }
4458 let rem_cids = self
4459 .rem_cids
4460 .entry(path_id)
4461 .or_insert_with(|| CidQueue::new(frame.id));
4462 if rem_cids.active().is_empty() {
4463 return Err(TransportError::PROTOCOL_VIOLATION(
4464 "NEW_CONNECTION_ID when CIDs aren't in use",
4465 ));
4466 }
4467 if frame.retire_prior_to > frame.sequence {
4468 return Err(TransportError::PROTOCOL_VIOLATION(
4469 "NEW_CONNECTION_ID retiring unissued CIDs",
4470 ));
4471 }
4472
4473 use crate::cid_queue::InsertError;
4474 match rem_cids.insert(frame) {
4475 Ok(None) if self.path(path_id).is_none() => {
4476 self.continue_nat_traversal_round(now);
4479 }
4480 Ok(None) => {}
4481 Ok(Some((retired, reset_token))) => {
4482 let pending_retired =
4483 &mut self.spaces[SpaceId::Data].pending.retire_cids;
4484 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4487 if (pending_retired.len() as u64)
4490 .saturating_add(retired.end.saturating_sub(retired.start))
4491 > MAX_PENDING_RETIRED_CIDS
4492 {
4493 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4494 "queued too many retired CIDs",
4495 ));
4496 }
4497 pending_retired.extend(retired.map(|seq| (path_id, seq)));
4498 self.set_reset_token(path_id, network_path.remote, reset_token);
4500 }
4501 Err(InsertError::ExceedsLimit) => {
4502 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4503 }
4504 Err(InsertError::Retired) => {
4505 trace!("discarding already-retired");
4506 self.spaces[SpaceId::Data]
4510 .pending
4511 .retire_cids
4512 .push((path_id, frame.sequence));
4513 continue;
4514 }
4515 };
4516
4517 if self.side.is_server()
4518 && path_id == PathId::ZERO
4519 && self
4520 .rem_cids
4521 .get(&PathId::ZERO)
4522 .map(|cids| cids.active_seq() == 0)
4523 .unwrap_or_default()
4524 {
4525 self.update_rem_cid(PathId::ZERO);
4528 }
4529 }
4530 Frame::NewToken(NewToken { token }) => {
4531 let ConnectionSide::Client {
4532 token_store,
4533 server_name,
4534 ..
4535 } = &self.side
4536 else {
4537 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4538 };
4539 if token.is_empty() {
4540 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4541 }
4542 trace!("got new token");
4543 token_store.insert(server_name, token);
4544 }
4545 Frame::Datagram(datagram) => {
4546 if self
4547 .datagrams
4548 .received(datagram, &self.config.datagram_receive_buffer_size)?
4549 {
4550 self.events.push_back(Event::DatagramReceived);
4551 }
4552 }
4553 Frame::AckFrequency(ack_frequency) => {
4554 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4557 continue;
4560 }
4561
4562 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4564 space.pending_acks.set_ack_frequency_params(&ack_frequency);
4565
4566 if let Some(timeout) = space
4569 .pending_acks
4570 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4571 {
4572 self.timers.set(
4573 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4574 timeout,
4575 self.qlog.with_time(now),
4576 );
4577 }
4578 }
4579 }
4580 Frame::ImmediateAck => {
4581 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4583 pns.pending_acks.set_immediate_ack_required();
4584 }
4585 }
4586 Frame::HandshakeDone => {
4587 if self.side.is_server() {
4588 return Err(TransportError::PROTOCOL_VIOLATION(
4589 "client sent HANDSHAKE_DONE",
4590 ));
4591 }
4592 if self.spaces[SpaceId::Handshake].crypto.is_some() {
4593 self.discard_space(now, SpaceId::Handshake);
4594 }
4595 self.events.push_back(Event::HandshakeConfirmed);
4596 trace!("handshake confirmed");
4597 }
4598 Frame::ObservedAddr(observed) => {
4599 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4601 if !self
4602 .peer_params
4603 .address_discovery_role
4604 .should_report(&self.config.address_discovery_role)
4605 {
4606 return Err(TransportError::PROTOCOL_VIOLATION(
4607 "received OBSERVED_ADDRESS frame when not negotiated",
4608 ));
4609 }
4610 if packet.header.space() != SpaceId::Data {
4612 return Err(TransportError::PROTOCOL_VIOLATION(
4613 "OBSERVED_ADDRESS frame outside data space",
4614 ));
4615 }
4616
4617 let path = self.path_data_mut(path_id);
4618 if network_path == path.network_path {
4619 if let Some(updated) = path.update_observed_addr_report(observed) {
4620 if path.open {
4621 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4622 id: path_id,
4623 addr: updated,
4624 }));
4625 }
4626 }
4628 } else {
4629 migration_observed_addr = Some(observed)
4631 }
4632 }
4633 Frame::PathAbandon(frame::PathAbandon {
4634 path_id,
4635 error_code,
4636 }) => {
4637 span.record("path", tracing::field::debug(&path_id));
4638 let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4640 Ok(()) => {
4641 trace!("peer abandoned path");
4642 false
4643 }
4644 Err(ClosePathError::LastOpenPath) => {
4645 trace!("peer abandoned last path, closing connection");
4646 return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4648 }
4649 Err(ClosePathError::ClosedPath) => {
4650 trace!("peer abandoned already closed path");
4651 true
4652 }
4653 };
4654 if self.path(path_id).is_some() && !already_abandoned {
4659 let delay = self.pto(SpaceId::Data, path_id) * 3;
4664 self.timers.set(
4665 Timer::PerPath(path_id, PathTimer::DiscardPath),
4666 now + delay,
4667 self.qlog.with_time(now),
4668 );
4669 }
4670 }
4671 Frame::PathStatusAvailable(info) => {
4672 span.record("path", tracing::field::debug(&info.path_id));
4673 if self.is_multipath_negotiated() {
4674 self.on_path_status(
4675 info.path_id,
4676 PathStatus::Available,
4677 info.status_seq_no,
4678 );
4679 } else {
4680 return Err(TransportError::PROTOCOL_VIOLATION(
4681 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4682 ));
4683 }
4684 }
4685 Frame::PathStatusBackup(info) => {
4686 span.record("path", tracing::field::debug(&info.path_id));
4687 if self.is_multipath_negotiated() {
4688 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4689 } else {
4690 return Err(TransportError::PROTOCOL_VIOLATION(
4691 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4692 ));
4693 }
4694 }
4695 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4696 span.record("path", tracing::field::debug(&path_id));
4697 if !self.is_multipath_negotiated() {
4698 return Err(TransportError::PROTOCOL_VIOLATION(
4699 "received MAX_PATH_ID frame when multipath was not negotiated",
4700 ));
4701 }
4702 if path_id > self.remote_max_path_id {
4704 self.remote_max_path_id = path_id;
4705 self.issue_first_path_cids(now);
4706 while let Some(true) = self.continue_nat_traversal_round(now) {}
4707 }
4708 }
4709 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4710 if self.is_multipath_negotiated() {
4714 if self.local_max_path_id > max_path_id {
4715 return Err(TransportError::PROTOCOL_VIOLATION(
4716 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4717 ));
4718 }
4719 debug!("received PATHS_BLOCKED({:?})", max_path_id);
4720 } else {
4722 return Err(TransportError::PROTOCOL_VIOLATION(
4723 "received PATHS_BLOCKED frame when not multipath was not negotiated",
4724 ));
4725 }
4726 }
4727 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4728 if self.is_multipath_negotiated() {
4736 if path_id > self.local_max_path_id {
4737 return Err(TransportError::PROTOCOL_VIOLATION(
4738 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4739 ));
4740 }
4741 if next_seq.0
4742 > self
4743 .local_cid_state
4744 .get(&path_id)
4745 .map(|cid_state| cid_state.active_seq().1 + 1)
4746 .unwrap_or_default()
4747 {
4748 return Err(TransportError::PROTOCOL_VIOLATION(
4749 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4750 ));
4751 }
4752 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4753 } else {
4754 return Err(TransportError::PROTOCOL_VIOLATION(
4755 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4756 ));
4757 }
4758 }
4759 Frame::AddAddress(addr) => {
4760 let client_state = match self.iroh_hp.client_side_mut() {
4761 Ok(state) => state,
4762 Err(err) => {
4763 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4764 "Nat traversal(ADD_ADDRESS): {err}"
4765 )));
4766 }
4767 };
4768
4769 if !client_state.check_remote_address(&addr) {
4770 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4772 }
4773
4774 match client_state.add_remote_address(addr) {
4775 Ok(maybe_added) => {
4776 if let Some(added) = maybe_added {
4777 self.events.push_back(Event::NatTraversal(
4778 iroh_hp::Event::AddressAdded(added),
4779 ));
4780 }
4781 }
4782 Err(e) => {
4783 warn!(%e, "failed to add remote address")
4784 }
4785 }
4786 }
4787 Frame::RemoveAddress(addr) => {
4788 let client_state = match self.iroh_hp.client_side_mut() {
4789 Ok(state) => state,
4790 Err(err) => {
4791 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4792 "Nat traversal(REMOVE_ADDRESS): {err}"
4793 )));
4794 }
4795 };
4796 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4797 self.events
4798 .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4799 removed_addr,
4800 )));
4801 }
4802 }
4803 Frame::ReachOut(reach_out) => {
4804 let server_state = match self.iroh_hp.server_side_mut() {
4805 Ok(state) => state,
4806 Err(err) => {
4807 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4808 "Nat traversal(REACH_OUT): {err}"
4809 )));
4810 }
4811 };
4812
4813 if let Err(err) = server_state.handle_reach_out(reach_out) {
4814 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4815 "Nat traversal(REACH_OUT): {err}"
4816 )));
4817 }
4818 }
4819 }
4820 }
4821
4822 let space = self.spaces[SpaceId::Data].for_path(path_id);
4823 if space
4824 .pending_acks
4825 .packet_received(now, number, ack_eliciting, &space.dedup)
4826 {
4827 if self.abandoned_paths.contains(&path_id) {
4828 space.pending_acks.set_immediate_ack_required();
4831 } else {
4832 self.timers.set(
4833 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4834 now + self.ack_frequency.max_ack_delay,
4835 self.qlog.with_time(now),
4836 );
4837 }
4838 }
4839
4840 let pending = &mut self.spaces[SpaceId::Data].pending;
4845 self.streams.queue_max_stream_id(pending);
4846
4847 if let Some(reason) = close {
4848 self.state.move_to_draining(Some(reason.into()));
4849 self.close = true;
4850 }
4851
4852 if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4853 && !is_probing_packet
4854 && network_path != self.path_data(path_id).network_path
4855 {
4856 let ConnectionSide::Server { ref server_config } = self.side else {
4857 panic!("packets from unknown remote should be dropped by clients");
4858 };
4859 debug_assert!(
4860 server_config.migration,
4861 "migration-initiating packets should have been dropped immediately"
4862 );
4863 self.migrate(path_id, now, network_path, migration_observed_addr);
4864 self.update_rem_cid(path_id);
4866 self.spin = false;
4867 }
4868
4869 Ok(())
4870 }
4871
4872 fn migrate(
4873 &mut self,
4874 path_id: PathId,
4875 now: Instant,
4876 network_path: FourTuple,
4877 observed_addr: Option<ObservedAddr>,
4878 ) {
4879 trace!(%network_path, %path_id, "migration initiated");
4880 self.path_counter = self.path_counter.wrapping_add(1);
4881 let prev_pto = self.pto(SpaceId::Data, path_id);
4888 let known_path = self.paths.get_mut(&path_id).expect("known path");
4889 let path = &mut known_path.data;
4890 let mut new_path = if network_path.remote.is_ipv4()
4891 && network_path.remote.ip() == path.network_path.remote.ip()
4892 {
4893 PathData::from_previous(network_path, path, self.path_counter, now)
4894 } else {
4895 let peer_max_udp_payload_size =
4896 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4897 .unwrap_or(u16::MAX);
4898 PathData::new(
4899 network_path,
4900 self.allow_mtud,
4901 Some(peer_max_udp_payload_size),
4902 self.path_counter,
4903 now,
4904 &self.config,
4905 )
4906 };
4907 new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4908 if let Some(report) = observed_addr {
4909 if let Some(updated) = new_path.update_observed_addr_report(report) {
4910 tracing::info!("adding observed addr event from migration");
4911 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4912 id: path_id,
4913 addr: updated,
4914 }));
4915 }
4916 }
4917 new_path.send_new_challenge = true;
4918
4919 let mut prev = mem::replace(path, new_path);
4920 if !prev.is_validating_path() {
4922 prev.send_new_challenge = true;
4923 known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4927 }
4928
4929 self.qlog.emit_tuple_assigned(path_id, network_path, now);
4931
4932 self.timers.set(
4933 Timer::PerPath(path_id, PathTimer::PathValidation),
4934 now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4935 self.qlog.with_time(now),
4936 );
4937 }
4938
4939 pub fn local_address_changed(&mut self) {
4941 self.update_rem_cid(PathId::ZERO);
4943 self.ping();
4944 }
4945
4946 fn update_rem_cid(&mut self, path_id: PathId) {
4948 let Some((reset_token, retired)) =
4949 self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4950 else {
4951 return;
4952 };
4953
4954 self.spaces[SpaceId::Data]
4956 .pending
4957 .retire_cids
4958 .extend(retired.map(|seq| (path_id, seq)));
4959 let remote = self.path_data(path_id).network_path.remote;
4960 self.set_reset_token(path_id, remote, reset_token);
4961 }
4962
4963 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4972 self.endpoint_events
4973 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4974
4975 if path_id == PathId::ZERO {
4981 self.peer_params.stateless_reset_token = Some(reset_token);
4982 }
4983 }
4984
4985 fn issue_first_cids(&mut self, now: Instant) {
4987 if self
4988 .local_cid_state
4989 .get(&PathId::ZERO)
4990 .expect("PathId::ZERO exists when the connection is created")
4991 .cid_len()
4992 == 0
4993 {
4994 return;
4995 }
4996
4997 let mut n = self.peer_params.issue_cids_limit() - 1;
4999 if let ConnectionSide::Server { server_config } = &self.side {
5000 if server_config.has_preferred_address() {
5001 n -= 1;
5003 }
5004 }
5005 self.endpoint_events
5006 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5007 }
5008
5009 fn issue_first_path_cids(&mut self, now: Instant) {
5013 if let Some(max_path_id) = self.max_path_id() {
5014 let mut path_id = self.max_path_id_with_cids.next();
5015 while path_id <= max_path_id {
5016 self.endpoint_events
5017 .push_back(EndpointEventInner::NeedIdentifiers(
5018 path_id,
5019 now,
5020 self.peer_params.issue_cids_limit(),
5021 ));
5022 path_id = path_id.next();
5023 }
5024 self.max_path_id_with_cids = max_path_id;
5025 }
5026 }
5027
5028 fn populate_packet<'a, 'b>(
5036 &mut self,
5037 now: Instant,
5038 space_id: SpaceId,
5039 path_id: PathId,
5040 path_exclusive_only: bool,
5041 builder: &mut PacketBuilder,
5042 ) {
5043 let pn = builder.exact_number;
5044 let is_multipath_negotiated = self.is_multipath_negotiated();
5045 let stats = &mut self.stats.frame_tx;
5046 let space = &mut self.spaces[space_id];
5047 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5048 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5049 space
5050 .for_path(path_id)
5051 .pending_acks
5052 .maybe_ack_non_eliciting();
5053
5054 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5056 trace!("HANDSHAKE_DONE");
5057 builder.encode(frame::HandshakeDone, stats);
5058 }
5059
5060 if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5063 while let Some(local_addr) = addresses.pop() {
5064 let reach_out = frame::ReachOut::new(*round, local_addr);
5065 if builder.frame_space_remaining() > reach_out.size() {
5066 trace!(%round, ?local_addr, "REACH_OUT");
5067 builder.encode(reach_out, stats);
5068 } else {
5069 addresses.push(local_addr);
5070 break;
5071 }
5072 }
5073 if addresses.is_empty() {
5074 space.pending.reach_out = None;
5075 }
5076 }
5077
5078 if !path_exclusive_only
5080 && space_id == SpaceId::Data
5081 && self
5082 .config
5083 .address_discovery_role
5084 .should_report(&self.peer_params.address_discovery_role)
5085 && (!path.observed_addr_sent || space.pending.observed_addr)
5086 {
5087 let frame =
5088 frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
5089 if builder.frame_space_remaining() > frame.size() {
5090 trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5091 builder.encode(frame, stats);
5092
5093 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5094 path.observed_addr_sent = true;
5095
5096 space.pending.observed_addr = false;
5097 }
5098 }
5099
5100 if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5102 trace!("PING");
5103 builder.encode(frame::Ping, stats);
5104 }
5105
5106 if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5108 debug_assert_eq!(
5109 space_id,
5110 SpaceId::Data,
5111 "immediate acks must be sent in the data space"
5112 );
5113 trace!("IMMEDIATE_ACK");
5114 builder.encode(frame::ImmediateAck, stats);
5115 }
5116
5117 if !path_exclusive_only {
5121 for path_id in space
5122 .number_spaces
5123 .iter_mut()
5124 .filter(|(_, pns)| pns.pending_acks.can_send())
5125 .map(|(&path_id, _)| path_id)
5126 .collect::<Vec<_>>()
5127 {
5128 Self::populate_acks(
5129 now,
5130 self.receiving_ecn,
5131 path_id,
5132 space_id,
5133 space,
5134 is_multipath_negotiated,
5135 builder,
5136 stats,
5137 );
5138 }
5139 }
5140
5141 if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5143 let sequence_number = self.ack_frequency.next_sequence_number();
5144
5145 let config = self.config.ack_frequency_config.as_ref().unwrap();
5147
5148 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5150 path.rtt.get(),
5151 config,
5152 &self.peer_params,
5153 );
5154
5155 trace!(?max_ack_delay, "ACK_FREQUENCY");
5156
5157 let frame = frame::AckFrequency {
5158 sequence: sequence_number,
5159 ack_eliciting_threshold: config.ack_eliciting_threshold,
5160 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5161 reordering_threshold: config.reordering_threshold,
5162 };
5163 builder.encode(frame, stats);
5164
5165 self.ack_frequency
5166 .ack_frequency_sent(path_id, pn, max_ack_delay);
5167 }
5168
5169 if builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
5171 && space_id == SpaceId::Data
5172 && path.send_new_challenge
5173 && !self.state.is_closed()
5174 {
5176 path.send_new_challenge = false;
5177
5178 let token = self.rng.random();
5180 let info = paths::SentChallengeInfo {
5181 sent_instant: now,
5182 network_path: path.network_path,
5183 };
5184 path.challenges_sent.insert(token, info);
5185 let challenge = frame::PathChallenge(token);
5186 trace!(frame = %challenge);
5187 builder.encode(challenge, stats);
5188 builder.require_padding();
5189 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5190 self.timers.set(
5191 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5192 now + pto,
5193 self.qlog.with_time(now),
5194 );
5195
5196 if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5197 space.pending.path_status.insert(path_id);
5199 }
5200
5201 if space_id == SpaceId::Data
5204 && self
5205 .config
5206 .address_discovery_role
5207 .should_report(&self.peer_params.address_discovery_role)
5208 {
5209 let frame = frame::ObservedAddr::new(
5210 path.network_path.remote,
5211 self.next_observed_addr_seq_no,
5212 );
5213 if builder.frame_space_remaining() > frame.size() {
5214 builder.encode(frame, stats);
5215
5216 self.next_observed_addr_seq_no =
5217 self.next_observed_addr_seq_no.saturating_add(1u8);
5218 path.observed_addr_sent = true;
5219
5220 space.pending.observed_addr = false;
5221 }
5222 }
5223 }
5224
5225 if builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
5227 && space_id == SpaceId::Data
5228 {
5229 if let Some(token) = path.path_responses.pop_on_path(path.network_path) {
5230 let response = frame::PathResponse(token);
5231 trace!(frame = %response);
5232 builder.encode(response, stats);
5233 builder.require_padding();
5234
5235 if space_id == SpaceId::Data
5239 && self
5240 .config
5241 .address_discovery_role
5242 .should_report(&self.peer_params.address_discovery_role)
5243 {
5244 let frame = frame::ObservedAddr::new(
5245 path.network_path.remote,
5246 self.next_observed_addr_seq_no,
5247 );
5248 if builder.frame_space_remaining() > frame.size() {
5249 builder.encode(frame, stats);
5250
5251 self.next_observed_addr_seq_no =
5252 self.next_observed_addr_seq_no.saturating_add(1u8);
5253 path.observed_addr_sent = true;
5254
5255 space.pending.observed_addr = false;
5256 }
5257 }
5258 }
5259 }
5260
5261 while !path_exclusive_only
5263 && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
5264 && !is_0rtt
5265 {
5266 let mut frame = match space.pending.crypto.pop_front() {
5267 Some(x) => x,
5268 None => break,
5269 };
5270
5271 let max_crypto_data_size = builder.frame_space_remaining()
5276 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5278 - 2; let len = frame
5281 .data
5282 .len()
5283 .min(2usize.pow(14) - 1)
5284 .min(max_crypto_data_size);
5285
5286 let data = frame.data.split_to(len);
5287 let offset = frame.offset;
5288 let truncated = frame::Crypto { offset, data };
5289 trace!(off = offset, len = truncated.data.len(), "CRYPTO");
5290 builder.encode(truncated, stats);
5291
5292 if !frame.data.is_empty() {
5293 frame.offset += len as u64;
5294 space.pending.crypto.push_front(frame);
5295 }
5296 }
5297
5298 while !path_exclusive_only
5301 && space_id == SpaceId::Data
5302 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
5303 {
5304 let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5305 break;
5306 };
5307 let frame = frame::PathAbandon {
5308 path_id,
5309 error_code,
5310 };
5311 builder.encode(frame, stats);
5312 trace!(%path_id, "PATH_ABANDON");
5313 }
5314
5315 while !path_exclusive_only
5317 && space_id == SpaceId::Data
5318 && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
5319 {
5320 let Some(path_id) = space.pending.path_status.pop_first() else {
5321 break;
5322 };
5323 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5324 trace!(%path_id, "discarding queued path status for unknown path");
5325 continue;
5326 };
5327
5328 let seq = path.status.seq();
5329 match path.local_status() {
5330 PathStatus::Available => {
5331 let frame = frame::PathStatusAvailable {
5332 path_id,
5333 status_seq_no: seq,
5334 };
5335 builder.encode(frame, stats);
5336 trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5337 }
5338 PathStatus::Backup => {
5339 let frame = frame::PathStatusBackup {
5340 path_id,
5341 status_seq_no: seq,
5342 };
5343 builder.encode(frame, stats);
5344 trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5345 }
5346 }
5347 }
5348
5349 if space_id == SpaceId::Data
5351 && space.pending.max_path_id
5352 && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
5353 {
5354 let frame = frame::MaxPathId(self.local_max_path_id);
5355 builder.encode(frame, stats);
5356 space.pending.max_path_id = false;
5357 trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5358 }
5359
5360 if space_id == SpaceId::Data
5362 && space.pending.paths_blocked
5363 && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5364 {
5365 let frame = frame::PathsBlocked(self.remote_max_path_id);
5366 builder.encode(frame, stats);
5367 space.pending.paths_blocked = false;
5368 trace!(max_path_id = %self.remote_max_path_id, "PATHS_BLOCKED");
5369 }
5370
5371 while space_id == SpaceId::Data
5373 && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5374 {
5375 let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5376 break;
5377 };
5378 let next_seq = match self.rem_cids.get(&path_id) {
5379 Some(cid_queue) => cid_queue.active_seq() + 1,
5380 None => 0,
5381 };
5382 let frame = frame::PathCidsBlocked {
5383 path_id,
5384 next_seq: VarInt(next_seq),
5385 };
5386 builder.encode(frame, stats);
5387 trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5388 }
5389
5390 if space_id == SpaceId::Data {
5392 self.streams
5393 .write_control_frames(builder, &mut space.pending, stats);
5394 }
5395
5396 let cid_len = self
5398 .local_cid_state
5399 .values()
5400 .map(|cid_state| cid_state.cid_len())
5401 .max()
5402 .expect("some local CID state must exist");
5403 let new_cid_size_bound =
5404 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5405 while !path_exclusive_only && builder.frame_space_remaining() > new_cid_size_bound {
5406 let issued = match space.pending.new_cids.pop() {
5407 Some(x) => x,
5408 None => break,
5409 };
5410 let retire_prior_to = self
5411 .local_cid_state
5412 .get(&issued.path_id)
5413 .map(|cid_state| cid_state.retire_prior_to())
5414 .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5415
5416 let cid_path_id = match is_multipath_negotiated {
5417 true => {
5418 trace!(
5419 path_id = ?issued.path_id,
5420 sequence = issued.sequence,
5421 id = %issued.id,
5422 "PATH_NEW_CONNECTION_ID",
5423 );
5424 Some(issued.path_id)
5425 }
5426 false => {
5427 trace!(
5428 sequence = issued.sequence,
5429 id = %issued.id,
5430 "NEW_CONNECTION_ID"
5431 );
5432 debug_assert_eq!(issued.path_id, PathId::ZERO);
5433 None
5434 }
5435 };
5436 let frame = frame::NewConnectionId {
5437 path_id: cid_path_id,
5438 sequence: issued.sequence,
5439 retire_prior_to,
5440 id: issued.id,
5441 reset_token: issued.reset_token,
5442 };
5443 builder.encode(frame, stats);
5444 }
5445
5446 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5448 while !path_exclusive_only && builder.frame_space_remaining() > retire_cid_bound {
5449 let (path_id, sequence) = match space.pending.retire_cids.pop() {
5450 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5451 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5452 (None, seq)
5453 }
5454 Some((path_id, seq)) => {
5455 trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5456 (Some(path_id), seq)
5457 }
5458 None => break,
5459 };
5460 let frame = frame::RetireConnectionId { path_id, sequence };
5461 builder.encode(frame, stats);
5462 }
5463
5464 let mut sent_datagrams = false;
5466 while !path_exclusive_only
5467 && builder.frame_space_remaining() > Datagram::SIZE_BOUND
5468 && space_id == SpaceId::Data
5469 {
5470 match self.datagrams.write(builder, stats) {
5471 true => {
5472 sent_datagrams = true;
5473 }
5474 false => break,
5475 }
5476 }
5477 if self.datagrams.send_blocked && sent_datagrams {
5478 self.events.push_back(Event::DatagramsUnblocked);
5479 self.datagrams.send_blocked = false;
5480 }
5481
5482 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5483
5484 while let Some(network_path) = space.pending.new_tokens.pop() {
5486 if path_exclusive_only {
5487 break;
5488 }
5489 debug_assert_eq!(space_id, SpaceId::Data);
5490 let ConnectionSide::Server { server_config } = &self.side else {
5491 panic!("NEW_TOKEN frames should not be enqueued by clients");
5492 };
5493
5494 if !network_path.is_probably_same_path(&path.network_path) {
5495 continue;
5500 }
5501
5502 let token = Token::new(
5503 TokenPayload::Validation {
5504 ip: network_path.remote.ip(),
5505 issued: server_config.time_source.now(),
5506 },
5507 &mut self.rng,
5508 );
5509 let new_token = NewToken {
5510 token: token.encode(&*server_config.token_key).into(),
5511 };
5512
5513 if builder.frame_space_remaining() < new_token.size() {
5514 space.pending.new_tokens.push(network_path);
5515 break;
5516 }
5517
5518 trace!("NEW_TOKEN");
5519 builder.encode(new_token, stats);
5520 builder.retransmits_mut().new_tokens.push(network_path);
5521 }
5522
5523 if !path_exclusive_only && space_id == SpaceId::Data {
5525 self.streams
5526 .write_stream_frames(builder, self.config.send_fairness, stats);
5527 }
5528
5529 while space_id == SpaceId::Data
5532 && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
5533 {
5534 if let Some(added_address) = space.pending.add_address.pop_last() {
5535 trace!(
5536 seq = %added_address.seq_no,
5537 ip = ?added_address.ip,
5538 port = added_address.port,
5539 "ADD_ADDRESS",
5540 );
5541 builder.encode(added_address, stats);
5542 } else {
5543 break;
5544 }
5545 }
5546
5547 while space_id == SpaceId::Data
5549 && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
5550 {
5551 if let Some(removed_address) = space.pending.remove_address.pop_last() {
5552 trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5553 builder.encode(removed_address, stats);
5554 } else {
5555 break;
5556 }
5557 }
5558 }
5559
5560 fn populate_acks<'a, 'b>(
5562 now: Instant,
5563 receiving_ecn: bool,
5564 path_id: PathId,
5565 space_id: SpaceId,
5566 space: &mut PacketSpace,
5567 is_multipath_negotiated: bool,
5568 builder: &mut PacketBuilder<'a, 'b>,
5569 stats: &mut FrameStats,
5570 ) {
5571 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5573
5574 debug_assert!(
5575 is_multipath_negotiated || path_id == PathId::ZERO,
5576 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5577 );
5578 if is_multipath_negotiated {
5579 debug_assert!(
5580 space_id == SpaceId::Data || path_id == PathId::ZERO,
5581 "path acks must be sent in 1RTT space (have {space_id:?})"
5582 );
5583 }
5584
5585 let pns = space.for_path(path_id);
5586 let ranges = pns.pending_acks.ranges();
5587 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5588 let ecn = if receiving_ecn {
5589 Some(&pns.ecn_counters)
5590 } else {
5591 None
5592 };
5593
5594 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5595 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5597 let delay = delay_micros >> ack_delay_exp.into_inner();
5598
5599 if is_multipath_negotiated && space_id == SpaceId::Data {
5600 if !ranges.is_empty() {
5601 trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5602 let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
5603 builder.encode(frame, stats);
5604 }
5605 } else {
5606 trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5607 builder.encode(frame::Ack::encoder(delay, ranges, ecn), stats);
5608 }
5609 }
5610
5611 fn close_common(&mut self) {
5612 trace!("connection closed");
5613 self.timers.reset();
5614 }
5615
5616 fn set_close_timer(&mut self, now: Instant) {
5617 let pto_max = self.pto_max_path(self.highest_space, true);
5620 self.timers.set(
5621 Timer::Conn(ConnTimer::Close),
5622 now + 3 * pto_max,
5623 self.qlog.with_time(now),
5624 );
5625 }
5626
5627 fn handle_peer_params(
5632 &mut self,
5633 params: TransportParameters,
5634 loc_cid: ConnectionId,
5635 rem_cid: ConnectionId,
5636 now: Instant,
5637 ) -> Result<(), TransportError> {
5638 if Some(self.orig_rem_cid) != params.initial_src_cid
5639 || (self.side.is_client()
5640 && (Some(self.initial_dst_cid) != params.original_dst_cid
5641 || self.retry_src_cid != params.retry_src_cid))
5642 {
5643 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5644 "CID authentication failure",
5645 ));
5646 }
5647 if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5648 return Err(TransportError::PROTOCOL_VIOLATION(
5649 "multipath must not use zero-length CIDs",
5650 ));
5651 }
5652
5653 self.set_peer_params(params);
5654 self.qlog.emit_peer_transport_params_received(self, now);
5655
5656 Ok(())
5657 }
5658
5659 fn set_peer_params(&mut self, params: TransportParameters) {
5660 self.streams.set_params(¶ms);
5661 self.idle_timeout =
5662 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5663 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5664
5665 if let Some(ref info) = params.preferred_address {
5666 self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5668 path_id: None,
5669 sequence: 1,
5670 id: info.connection_id,
5671 reset_token: info.stateless_reset_token,
5672 retire_prior_to: 0,
5673 })
5674 .expect(
5675 "preferred address CID is the first received, and hence is guaranteed to be legal",
5676 );
5677 let remote = self.path_data(PathId::ZERO).network_path.remote;
5678 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5679 }
5680 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
5681
5682 let mut multipath_enabled = None;
5683 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5684 self.config.get_initial_max_path_id(),
5685 params.initial_max_path_id,
5686 ) {
5687 self.local_max_path_id = local_max_path_id;
5689 self.remote_max_path_id = remote_max_path_id;
5690 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5691 debug!(%initial_max_path_id, "multipath negotiated");
5692 multipath_enabled = Some(initial_max_path_id);
5693 }
5694
5695 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5696 self.config
5697 .max_remote_nat_traversal_addresses
5698 .zip(params.max_remote_nat_traversal_addresses)
5699 {
5700 if let Some(max_initial_paths) =
5701 multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5702 {
5703 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5704 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5705 self.iroh_hp =
5706 iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5707 debug!(
5708 %max_remote_addresses, %max_local_addresses,
5709 "iroh hole punching negotiated"
5710 );
5711
5712 match self.side() {
5713 Side::Client => {
5714 if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5715 warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5718 } else if max_local_addresses as u64
5719 > params.active_connection_id_limit.into_inner()
5720 {
5721 warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5725 }
5726 }
5727 Side::Server => {
5728 if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5729 warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5730 }
5731 }
5732 }
5733 } else {
5734 debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5735 }
5736 }
5737
5738 self.peer_params = params;
5739 let peer_max_udp_payload_size =
5740 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5741 self.path_data_mut(PathId::ZERO)
5742 .mtud
5743 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5744 }
5745
5746 fn decrypt_packet(
5748 &mut self,
5749 now: Instant,
5750 path_id: PathId,
5751 packet: &mut Packet,
5752 ) -> Result<Option<u64>, Option<TransportError>> {
5753 let result = packet_crypto::decrypt_packet_body(
5754 packet,
5755 path_id,
5756 &self.spaces,
5757 self.zero_rtt_crypto.as_ref(),
5758 self.key_phase,
5759 self.prev_crypto.as_ref(),
5760 self.next_crypto.as_ref(),
5761 )?;
5762
5763 let result = match result {
5764 Some(r) => r,
5765 None => return Ok(None),
5766 };
5767
5768 if result.outgoing_key_update_acked {
5769 if let Some(prev) = self.prev_crypto.as_mut() {
5770 prev.end_packet = Some((result.number, now));
5771 self.set_key_discard_timer(now, packet.header.space());
5772 }
5773 }
5774
5775 if result.incoming_key_update {
5776 trace!("key update authenticated");
5777 self.update_keys(Some((result.number, now)), true);
5778 self.set_key_discard_timer(now, packet.header.space());
5779 }
5780
5781 Ok(Some(result.number))
5782 }
5783
5784 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5785 trace!("executing key update");
5786 let new = self
5790 .crypto
5791 .next_1rtt_keys()
5792 .expect("only called for `Data` packets");
5793 self.key_phase_size = new
5794 .local
5795 .confidentiality_limit()
5796 .saturating_sub(KEY_UPDATE_MARGIN);
5797 let old = mem::replace(
5798 &mut self.spaces[SpaceId::Data]
5799 .crypto
5800 .as_mut()
5801 .unwrap() .packet,
5803 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5804 );
5805 self.spaces[SpaceId::Data]
5806 .iter_paths_mut()
5807 .for_each(|s| s.sent_with_keys = 0);
5808 self.prev_crypto = Some(PrevCrypto {
5809 crypto: old,
5810 end_packet,
5811 update_unacked: remote,
5812 });
5813 self.key_phase = !self.key_phase;
5814 }
5815
5816 fn peer_supports_ack_frequency(&self) -> bool {
5817 self.peer_params.min_ack_delay.is_some()
5818 }
5819
5820 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5825 debug_assert_eq!(
5826 self.highest_space,
5827 SpaceId::Data,
5828 "immediate ack must be written in the data space"
5829 );
5830 self.spaces[self.highest_space]
5831 .for_path(path_id)
5832 .immediate_ack_pending = true;
5833 }
5834
5835 #[cfg(test)]
5837 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5838 let (path_id, first_decode, remaining) = match &event.0 {
5839 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5840 path_id,
5841 first_decode,
5842 remaining,
5843 ..
5844 }) => (path_id, first_decode, remaining),
5845 _ => return None,
5846 };
5847
5848 if remaining.is_some() {
5849 panic!("Packets should never be coalesced in tests");
5850 }
5851
5852 let decrypted_header = packet_crypto::unprotect_header(
5853 first_decode.clone(),
5854 &self.spaces,
5855 self.zero_rtt_crypto.as_ref(),
5856 self.peer_params.stateless_reset_token,
5857 )?;
5858
5859 let mut packet = decrypted_header.packet?;
5860 packet_crypto::decrypt_packet_body(
5861 &mut packet,
5862 *path_id,
5863 &self.spaces,
5864 self.zero_rtt_crypto.as_ref(),
5865 self.key_phase,
5866 self.prev_crypto.as_ref(),
5867 self.next_crypto.as_ref(),
5868 )
5869 .ok()?;
5870
5871 Some(packet.payload.to_vec())
5872 }
5873
5874 #[cfg(test)]
5877 pub(crate) fn bytes_in_flight(&self) -> u64 {
5878 self.path_data(PathId::ZERO).in_flight.bytes
5880 }
5881
5882 #[cfg(test)]
5884 pub(crate) fn congestion_window(&self) -> u64 {
5885 let path = self.path_data(PathId::ZERO);
5886 path.congestion
5887 .window()
5888 .saturating_sub(path.in_flight.bytes)
5889 }
5890
5891 #[cfg(test)]
5893 pub(crate) fn is_idle(&self) -> bool {
5894 let current_timers = self.timers.values();
5895 current_timers
5896 .into_iter()
5897 .filter(|(timer, _)| {
5898 !matches!(
5899 timer,
5900 Timer::Conn(ConnTimer::KeepAlive)
5901 | Timer::PerPath(_, PathTimer::PathKeepAlive)
5902 | Timer::Conn(ConnTimer::PushNewCid)
5903 | Timer::Conn(ConnTimer::KeyDiscard)
5904 )
5905 })
5906 .min_by_key(|(_, time)| *time)
5907 .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
5908 }
5909
5910 #[cfg(test)]
5912 pub(crate) fn using_ecn(&self) -> bool {
5913 self.path_data(PathId::ZERO).sending_ecn
5914 }
5915
5916 #[cfg(test)]
5918 pub(crate) fn total_recvd(&self) -> u64 {
5919 self.path_data(PathId::ZERO).total_recvd
5920 }
5921
5922 #[cfg(test)]
5923 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
5924 self.local_cid_state
5925 .get(&PathId::ZERO)
5926 .unwrap()
5927 .active_seq()
5928 }
5929
5930 #[cfg(test)]
5931 #[track_caller]
5932 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
5933 self.local_cid_state
5934 .get(&PathId(path_id))
5935 .unwrap()
5936 .active_seq()
5937 }
5938
5939 #[cfg(test)]
5942 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
5943 let n = self
5944 .local_cid_state
5945 .get_mut(&PathId::ZERO)
5946 .unwrap()
5947 .assign_retire_seq(v);
5948 self.endpoint_events
5949 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5950 }
5951
5952 #[cfg(test)]
5954 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
5955 self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
5956 }
5957
5958 #[cfg(test)]
5960 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
5961 self.path_data(path_id).current_mtu()
5962 }
5963
5964 #[cfg(test)]
5966 pub(crate) fn trigger_path_validation(&mut self) {
5967 for path in self.paths.values_mut() {
5968 path.data.send_new_challenge = true;
5969 }
5970 }
5971
5972 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
5983 let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
5984 path.data.send_new_challenge
5985 || path
5986 .prev
5987 .as_ref()
5988 .is_some_and(|(_, path)| path.send_new_challenge)
5989 || !path.data.path_responses.is_empty()
5990 });
5991 let other = self.streams.can_send_stream_data()
5992 || self
5993 .datagrams
5994 .outgoing
5995 .front()
5996 .is_some_and(|x| x.size(true) <= max_size);
5997 SendableFrames {
5998 acks: false,
5999 other,
6000 close: false,
6001 path_exclusive,
6002 }
6003 }
6004
6005 fn kill(&mut self, reason: ConnectionError) {
6007 self.close_common();
6008 self.state.move_to_drained(Some(reason));
6009 self.endpoint_events.push_back(EndpointEventInner::Drained);
6010 }
6011
6012 pub fn current_mtu(&self) -> u16 {
6019 self.paths
6020 .iter()
6021 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6022 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6023 .min()
6024 .expect("There is always at least one available path")
6025 }
6026
6027 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6034 let pn_len = PacketNumber::new(
6035 pn,
6036 self.spaces[SpaceId::Data]
6037 .for_path(path)
6038 .largest_acked_packet
6039 .unwrap_or(0),
6040 )
6041 .len();
6042
6043 1 + self
6045 .rem_cids
6046 .get(&path)
6047 .map(|cids| cids.active().len())
6048 .unwrap_or(20) + pn_len
6050 + self.tag_len_1rtt()
6051 }
6052
6053 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6054 let pn_len = 4;
6055
6056 let cid_len = self
6057 .rem_cids
6058 .values()
6059 .map(|cids| cids.active().len())
6060 .max()
6061 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
6065 }
6066
6067 fn tag_len_1rtt(&self) -> usize {
6068 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6069 Some(crypto) => Some(&*crypto.packet.local),
6070 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6071 };
6072 key.map_or(16, |x| x.tag_len())
6076 }
6077
6078 fn on_path_validated(&mut self, path_id: PathId) {
6080 self.path_data_mut(path_id).validated = true;
6081 let ConnectionSide::Server { server_config } = &self.side else {
6082 return;
6083 };
6084 let network_path = self.path_data(path_id).network_path;
6085 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6086 new_tokens.clear();
6087 for _ in 0..server_config.validation_token.sent {
6088 new_tokens.push(network_path);
6089 }
6090 }
6091
6092 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6094 if let Some(path) = self.paths.get_mut(&path_id) {
6095 path.data.status.remote_update(status, status_seq_no);
6096 } else {
6097 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6098 }
6099 self.events.push_back(
6100 PathEvent::RemoteStatus {
6101 id: path_id,
6102 status,
6103 }
6104 .into(),
6105 );
6106 }
6107
6108 fn max_path_id(&self) -> Option<PathId> {
6117 if self.is_multipath_negotiated() {
6118 Some(self.remote_max_path_id.min(self.local_max_path_id))
6119 } else {
6120 None
6121 }
6122 }
6123
6124 pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6126 if let Some(added) = self.iroh_hp.add_local_address(address)? {
6127 self.spaces[SpaceId::Data].pending.add_address.insert(added);
6128 };
6129 Ok(())
6130 }
6131
6132 pub fn remove_nat_traversal_address(
6136 &mut self,
6137 address: SocketAddr,
6138 ) -> Result<(), iroh_hp::Error> {
6139 if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6140 self.spaces[SpaceId::Data]
6141 .pending
6142 .remove_address
6143 .insert(removed);
6144 }
6145 Ok(())
6146 }
6147
6148 pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6150 self.iroh_hp.get_local_nat_traversal_addresses()
6151 }
6152
6153 pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6155 Ok(self
6156 .iroh_hp
6157 .client_side()?
6158 .get_remote_nat_traversal_addresses())
6159 }
6160
6161 fn open_nat_traversal_path(
6169 &mut self,
6170 now: Instant,
6171 (ip, port): (IpAddr, u16),
6172 ipv6: bool,
6173 ) -> Result<Option<(PathId, SocketAddr, bool)>, PathError> {
6174 let remote = match ip {
6176 IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6177 IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6178 IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6179 IpAddr::V6(_) => {
6180 trace!("not using IPv6 nat candidate for IPv4 socket");
6181 return Ok(None);
6182 }
6183 };
6184 let network_path = FourTuple {
6189 remote,
6190 local_ip: None,
6191 };
6192 match self.open_path_ensure(network_path, PathStatus::Backup, now) {
6193 Ok((path_id, path_was_known)) => {
6194 if path_was_known {
6195 trace!(%path_id, %remote, "nat traversal: path existed for remote");
6196 }
6197 Ok(Some((path_id, remote, path_was_known)))
6198 }
6199 Err(e) => {
6200 debug!(%remote, %e, "nat traversal: failed to probe remote");
6201 Err(e)
6202 }
6203 }
6204 }
6205
6206 pub fn initiate_nat_traversal_round(
6216 &mut self,
6217 now: Instant,
6218 ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6219 if self.state.is_closed() {
6220 return Err(iroh_hp::Error::Closed);
6221 }
6222
6223 let client_state = self.iroh_hp.client_side_mut()?;
6224 let iroh_hp::NatTraversalRound {
6225 new_round,
6226 reach_out_at,
6227 addresses_to_probe,
6228 prev_round_path_ids,
6229 } = client_state.initiate_nat_traversal_round()?;
6230
6231 self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6232
6233 for path_id in prev_round_path_ids {
6234 let validated = self
6237 .path(path_id)
6238 .map(|path| path.validated)
6239 .unwrap_or(false);
6240
6241 if !validated {
6242 let _ = self.close_path(
6243 now,
6244 path_id,
6245 TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6246 );
6247 }
6248 }
6249
6250 let mut err = None;
6251
6252 let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6253 let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6254 let ipv6 = self
6255 .paths
6256 .values()
6257 .any(|p| p.data.network_path.remote.is_ipv6());
6258
6259 for (id, address) in addresses_to_probe {
6260 match self.open_nat_traversal_path(now, address, ipv6) {
6261 Ok(None) => {}
6262 Ok(Some((path_id, remote, path_was_known))) => {
6263 if !path_was_known {
6264 path_ids.push(path_id);
6265 probed_addresses.push(remote);
6266 }
6267 }
6268 Err(e) => {
6269 self.iroh_hp
6270 .client_side_mut()
6271 .expect("validated")
6272 .report_in_continuation(id, e);
6273 err.get_or_insert(e);
6274 }
6275 }
6276 }
6277
6278 if let Some(err) = err {
6279 if probed_addresses.is_empty() {
6281 return Err(iroh_hp::Error::Multipath(err));
6282 }
6283 }
6284
6285 self.iroh_hp
6286 .client_side_mut()
6287 .expect("connection side validated")
6288 .set_round_path_ids(path_ids);
6289
6290 Ok(probed_addresses)
6291 }
6292
6293 fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6298 let client_state = self.iroh_hp.client_side_mut().ok()?;
6299 let (id, address) = client_state.continue_nat_traversal_round()?;
6300 let ipv6 = self
6301 .paths
6302 .values()
6303 .any(|p| p.data.network_path.remote.is_ipv6());
6304 let open_result = self.open_nat_traversal_path(now, address, ipv6);
6305 let client_state = self.iroh_hp.client_side_mut().expect("validated");
6306 match open_result {
6307 Ok(None) => Some(true),
6308 Ok(Some((path_id, _remote, path_was_known))) => {
6309 if !path_was_known {
6310 client_state.add_round_path_id(path_id);
6311 }
6312 Some(true)
6313 }
6314 Err(e) => {
6315 client_state.report_in_continuation(id, e);
6316 Some(false)
6317 }
6318 }
6319 }
6320}
6321
6322impl fmt::Debug for Connection {
6323 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6324 f.debug_struct("Connection")
6325 .field("handshake_cid", &self.handshake_cid)
6326 .finish()
6327 }
6328}
6329
6330#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6331enum PathBlocked {
6332 No,
6333 AntiAmplification,
6334 Congestion,
6335 Pacing,
6336}
6337
6338enum ConnectionSide {
6340 Client {
6341 token: Bytes,
6343 token_store: Arc<dyn TokenStore>,
6344 server_name: String,
6345 },
6346 Server {
6347 server_config: Arc<ServerConfig>,
6348 },
6349}
6350
6351impl ConnectionSide {
6352 fn remote_may_migrate(&self, state: &State) -> bool {
6353 match self {
6354 Self::Server { server_config } => server_config.migration,
6355 Self::Client { .. } => {
6356 if let Some(hs) = state.as_handshake() {
6357 hs.allow_server_migration
6358 } else {
6359 false
6360 }
6361 }
6362 }
6363 }
6364
6365 fn is_client(&self) -> bool {
6366 self.side().is_client()
6367 }
6368
6369 fn is_server(&self) -> bool {
6370 self.side().is_server()
6371 }
6372
6373 fn side(&self) -> Side {
6374 match *self {
6375 Self::Client { .. } => Side::Client,
6376 Self::Server { .. } => Side::Server,
6377 }
6378 }
6379}
6380
6381impl From<SideArgs> for ConnectionSide {
6382 fn from(side: SideArgs) -> Self {
6383 match side {
6384 SideArgs::Client {
6385 token_store,
6386 server_name,
6387 } => Self::Client {
6388 token: token_store.take(&server_name).unwrap_or_default(),
6389 token_store,
6390 server_name,
6391 },
6392 SideArgs::Server {
6393 server_config,
6394 pref_addr_cid: _,
6395 path_validated: _,
6396 } => Self::Server { server_config },
6397 }
6398 }
6399}
6400
6401pub(crate) enum SideArgs {
6403 Client {
6404 token_store: Arc<dyn TokenStore>,
6405 server_name: String,
6406 },
6407 Server {
6408 server_config: Arc<ServerConfig>,
6409 pref_addr_cid: Option<ConnectionId>,
6410 path_validated: bool,
6411 },
6412}
6413
6414impl SideArgs {
6415 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6416 match *self {
6417 Self::Client { .. } => None,
6418 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6419 }
6420 }
6421
6422 pub(crate) fn path_validated(&self) -> bool {
6423 match *self {
6424 Self::Client { .. } => true,
6425 Self::Server { path_validated, .. } => path_validated,
6426 }
6427 }
6428
6429 pub(crate) fn side(&self) -> Side {
6430 match *self {
6431 Self::Client { .. } => Side::Client,
6432 Self::Server { .. } => Side::Server,
6433 }
6434 }
6435}
6436
6437#[derive(Debug, Error, Clone, PartialEq, Eq)]
6439pub enum ConnectionError {
6440 #[error("peer doesn't implement any supported version")]
6442 VersionMismatch,
6443 #[error(transparent)]
6445 TransportError(#[from] TransportError),
6446 #[error("aborted by peer: {0}")]
6448 ConnectionClosed(frame::ConnectionClose),
6449 #[error("closed by peer: {0}")]
6451 ApplicationClosed(frame::ApplicationClose),
6452 #[error("reset by peer")]
6454 Reset,
6455 #[error("timed out")]
6461 TimedOut,
6462 #[error("closed")]
6464 LocallyClosed,
6465 #[error("CIDs exhausted")]
6469 CidsExhausted,
6470}
6471
6472impl From<Close> for ConnectionError {
6473 fn from(x: Close) -> Self {
6474 match x {
6475 Close::Connection(reason) => Self::ConnectionClosed(reason),
6476 Close::Application(reason) => Self::ApplicationClosed(reason),
6477 }
6478 }
6479}
6480
6481impl From<ConnectionError> for io::Error {
6483 fn from(x: ConnectionError) -> Self {
6484 use ConnectionError::*;
6485 let kind = match x {
6486 TimedOut => io::ErrorKind::TimedOut,
6487 Reset => io::ErrorKind::ConnectionReset,
6488 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6489 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6490 io::ErrorKind::Other
6491 }
6492 };
6493 Self::new(kind, x)
6494 }
6495}
6496
6497#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6500pub enum PathError {
6501 #[error("multipath extension not negotiated")]
6503 MultipathNotNegotiated,
6504 #[error("the server side may not open a path")]
6506 ServerSideNotAllowed,
6507 #[error("maximum number of concurrent paths reached")]
6509 MaxPathIdReached,
6510 #[error("remoted CIDs exhausted")]
6512 RemoteCidsExhausted,
6513 #[error("path validation failed")]
6515 ValidationFailed,
6516 #[error("invalid remote address")]
6518 InvalidRemoteAddress(SocketAddr),
6519}
6520
6521#[derive(Debug, Error, Clone, Eq, PartialEq)]
6523pub enum ClosePathError {
6524 #[error("closed path")]
6526 ClosedPath,
6527 #[error("last open path")]
6529 LastOpenPath,
6530}
6531
6532#[derive(Debug, Error, Clone, Copy)]
6533#[error("Multipath extension not negotiated")]
6534pub struct MultipathNotNegotiated {
6535 _private: (),
6536}
6537
6538#[derive(Debug)]
6540pub enum Event {
6541 HandshakeDataReady,
6543 Connected,
6545 HandshakeConfirmed,
6547 ConnectionLost {
6551 reason: ConnectionError,
6553 },
6554 Stream(StreamEvent),
6556 DatagramReceived,
6558 DatagramsUnblocked,
6560 Path(PathEvent),
6562 NatTraversal(iroh_hp::Event),
6564}
6565
6566impl From<PathEvent> for Event {
6567 fn from(source: PathEvent) -> Self {
6568 Self::Path(source)
6569 }
6570}
6571
6572fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6573 Duration::from_micros(params.max_ack_delay.0 * 1000)
6574}
6575
6576const MAX_BACKOFF_EXPONENT: u32 = 16;
6578
6579const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6587
6588const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6594 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6595
6596const KEY_UPDATE_MARGIN: u64 = 10_000;
6600
6601#[derive(Default)]
6602struct SentFrames {
6603 retransmits: ThinRetransmits,
6604 largest_acked: FxHashMap<PathId, u64>,
6606 stream_frames: StreamMetaVec,
6607 non_retransmits: bool,
6609 requires_padding: bool,
6611}
6612
6613impl SentFrames {
6614 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6616 !self.largest_acked.is_empty()
6617 && !self.non_retransmits
6618 && self.stream_frames.is_empty()
6619 && self.retransmits.is_empty(streams)
6620 }
6621
6622 fn retransmits_mut(&mut self) -> &mut Retransmits {
6623 self.retransmits.get_or_create()
6624 }
6625
6626 fn sent(&mut self, frame: frame::EncodableFrame<'_>) {
6627 use frame::EncodableFrame::*;
6628 match frame {
6629 PathAck(path_ack_encoder) => {
6630 if let Some(max) = path_ack_encoder.ranges.max() {
6631 self.largest_acked.insert(path_ack_encoder.path_id, max);
6632 }
6633 }
6634 Ack(ack_encoder) => {
6635 if let Some(max) = ack_encoder.ranges.max() {
6636 self.largest_acked.insert(PathId::ZERO, max);
6637 }
6638 }
6639 Close(_) => { }
6640 PathResponse(_) => self.non_retransmits = true,
6641 HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
6642 ReachOut(frame::ReachOut { round, ip, port }) => self
6643 .retransmits_mut()
6644 .reach_out
6645 .get_or_insert_with(|| (round, Vec::new()))
6646 .1
6647 .push((ip, port)),
6648 ObservedAddr(_) => self.retransmits_mut().observed_addr = true,
6649 Ping(_) => self.non_retransmits = true,
6650 ImmediateAck(_) => self.non_retransmits = true,
6651 AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
6652 PathChallenge(_) => self.non_retransmits = true,
6653 Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
6654 PathAbandon(path_abandon) => {
6655 self.retransmits_mut()
6656 .path_abandon
6657 .entry(path_abandon.path_id)
6658 .or_insert(path_abandon.error_code);
6659 }
6660 PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
6661 | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
6662 self.retransmits_mut().path_status.insert(path_id);
6663 }
6664 MaxPathId(_) => self.retransmits_mut().max_path_id = true,
6665 PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
6666 PathCidsBlocked(path_cids_blocked) => self
6667 .retransmits_mut()
6668 .path_cids_blocked
6669 .push(path_cids_blocked.path_id),
6670 ResetStream(reset) => self
6671 .retransmits_mut()
6672 .reset_stream
6673 .push((reset.id, reset.error_code)),
6674 StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
6675 NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
6676 RetireConnectionId(retire_cid) => self
6677 .retransmits_mut()
6678 .retire_cids
6679 .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
6680 Datagram(_) => self.non_retransmits = true,
6681 NewToken(_) => {}
6682 AddAddress(add_address) => {
6683 self.retransmits_mut().add_address.insert(add_address);
6684 }
6685 RemoveAddress(remove_address) => {
6686 self.retransmits_mut().remove_address.insert(remove_address);
6687 }
6688 StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
6689 MaxData(_) => self.retransmits_mut().max_data = true,
6690 MaxStreamData(max) => {
6691 self.retransmits_mut().max_stream_data.insert(max.id);
6692 }
6693 MaxStreams(max_streams) => {
6694 self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
6695 }
6696 }
6697 }
6698}
6699
6700fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6708 match (x, y) {
6709 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6710 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6711 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6712 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6713 }
6714}
6715
6716#[cfg(test)]
6717mod tests {
6718 use super::*;
6719
6720 #[test]
6721 fn negotiate_max_idle_timeout_commutative() {
6722 let test_params = [
6723 (None, None, None),
6724 (None, Some(VarInt(0)), None),
6725 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6726 (Some(VarInt(0)), Some(VarInt(0)), None),
6727 (
6728 Some(VarInt(2)),
6729 Some(VarInt(0)),
6730 Some(Duration::from_millis(2)),
6731 ),
6732 (
6733 Some(VarInt(1)),
6734 Some(VarInt(4)),
6735 Some(Duration::from_millis(1)),
6736 ),
6737 ];
6738
6739 for (left, right, result) in test_params {
6740 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6741 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6742 }
6743 }
6744}