1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 num::NonZeroU32,
8 ops::Not,
9 sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
22 MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
23 TransportErrorCode, VarInt,
24 cid_generator::ConnectionIdGenerator,
25 cid_queue::CidQueue,
26 coding::BufMutExt,
27 config::{ServerConfig, TransportConfig},
28 congestion::Controller,
29 connection::{
30 qlog::{QlogRecvPacket, QlogSentPacket, QlogSink},
31 spaces::LostPacket,
32 timer::{ConnTimer, PathTimer},
33 },
34 crypto::{self, KeyPair, Keys, PacketKey},
35 frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
36 iroh_hp,
37 packet::{
38 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
39 PacketNumber, PartialDecode, SpaceId,
40 },
41 range_set::ArrayRangeSet,
42 shared::{
43 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
44 EndpointEvent, EndpointEventInner,
45 },
46 token::{ResetToken, Token, TokenPayload},
47 transport_parameters::TransportParameters,
48};
49
50mod ack_frequency;
51use ack_frequency::AckFrequencyState;
52
53mod assembler;
54pub use assembler::Chunk;
55
56mod cid_state;
57use cid_state::CidState;
58
59mod datagrams;
60use datagrams::DatagramState;
61pub use datagrams::{Datagrams, SendDatagramError};
62
63mod mtud;
64mod pacing;
65
66mod packet_builder;
67use packet_builder::{PacketBuilder, PadDatagram};
68
69mod packet_crypto;
70use packet_crypto::{PrevCrypto, ZeroRttCrypto};
71
72mod paths;
73pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
74use paths::{PathData, PathState};
75
76pub(crate) mod qlog;
77
78mod send_buffer;
79
80mod spaces;
81#[cfg(fuzzing)]
82pub use spaces::Retransmits;
83#[cfg(not(fuzzing))]
84use spaces::Retransmits;
85use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
86
87mod stats;
88pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
89
90mod streams;
91#[cfg(fuzzing)]
92pub use streams::StreamsState;
93#[cfg(not(fuzzing))]
94use streams::StreamsState;
95pub use streams::{
96 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
97 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
98};
99
100mod timer;
101use timer::{Timer, TimerTable};
102
103mod transmit_buf;
104use transmit_buf::TransmitBuf;
105
106mod state;
107
108#[cfg(not(fuzzing))]
109use state::State;
110#[cfg(fuzzing)]
111pub use state::State;
112use state::StateType;
113
114pub struct Connection {
154 endpoint_config: Arc<EndpointConfig>,
155 config: Arc<TransportConfig>,
156 rng: StdRng,
157 crypto: Box<dyn crypto::Session>,
158 handshake_cid: ConnectionId,
160 rem_handshake_cid: ConnectionId,
162 local_ip: Option<IpAddr>,
165 paths: BTreeMap<PathId, PathState>,
171 path_counter: u64,
175 allow_mtud: bool,
177 state: State,
178 side: ConnectionSide,
179 zero_rtt_enabled: bool,
181 zero_rtt_crypto: Option<ZeroRttCrypto>,
183 key_phase: bool,
184 key_phase_size: u64,
186 peer_params: TransportParameters,
188 orig_rem_cid: ConnectionId,
190 initial_dst_cid: ConnectionId,
192 retry_src_cid: Option<ConnectionId>,
195 events: VecDeque<Event>,
197 endpoint_events: VecDeque<EndpointEventInner>,
198 spin_enabled: bool,
200 spin: bool,
202 spaces: [PacketSpace; 3],
204 highest_space: SpaceId,
206 prev_crypto: Option<PrevCrypto>,
208 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
213 accepted_0rtt: bool,
214 permit_idle_reset: bool,
216 idle_timeout: Option<Duration>,
218 timers: TimerTable,
219 authentication_failures: u64,
221
222 close: bool,
227
228 ack_frequency: AckFrequencyState,
232
233 receiving_ecn: bool,
238 total_authed_packets: u64,
240 app_limited: bool,
243
244 next_observed_addr_seq_no: VarInt,
249
250 streams: StreamsState,
251 rem_cids: FxHashMap<PathId, CidQueue>,
257 local_cid_state: FxHashMap<PathId, CidState>,
264 datagrams: DatagramState,
266 stats: ConnectionStats,
268 path_stats: FxHashMap<PathId, PathStats>,
270 version: u32,
272
273 max_concurrent_paths: NonZeroU32,
282 local_max_path_id: PathId,
297 remote_max_path_id: PathId,
303 max_path_id_with_cids: PathId,
309 abandoned_paths: FxHashSet<PathId>,
317
318 iroh_hp: iroh_hp::State,
319 qlog: QlogSink,
320}
321
322impl Connection {
323 pub(crate) fn new(
324 endpoint_config: Arc<EndpointConfig>,
325 config: Arc<TransportConfig>,
326 init_cid: ConnectionId,
327 loc_cid: ConnectionId,
328 rem_cid: ConnectionId,
329 remote: SocketAddr,
330 local_ip: Option<IpAddr>,
331 crypto: Box<dyn crypto::Session>,
332 cid_gen: &dyn ConnectionIdGenerator,
333 now: Instant,
334 version: u32,
335 allow_mtud: bool,
336 rng_seed: [u8; 32],
337 side_args: SideArgs,
338 qlog: QlogSink,
339 ) -> Self {
340 let pref_addr_cid = side_args.pref_addr_cid();
341 let path_validated = side_args.path_validated();
342 let connection_side = ConnectionSide::from(side_args);
343 let side = connection_side.side();
344 let mut rng = StdRng::from_seed(rng_seed);
345 let initial_space = {
346 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
347 space.crypto = Some(crypto.initial_keys(init_cid, side));
348 space
349 };
350 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
351 #[cfg(test)]
352 let data_space = match config.deterministic_packet_numbers {
353 true => PacketSpace::new_deterministic(now, SpaceId::Data),
354 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
355 };
356 #[cfg(not(test))]
357 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
358 let state = State::handshake(state::Handshake {
359 rem_cid_set: side.is_server(),
360 expected_token: Bytes::new(),
361 client_hello: None,
362 allow_server_migration: side.is_client(),
363 });
364 let local_cid_state = FxHashMap::from_iter([(
365 PathId::ZERO,
366 CidState::new(
367 cid_gen.cid_len(),
368 cid_gen.cid_lifetime(),
369 now,
370 if pref_addr_cid.is_some() { 2 } else { 1 },
371 ),
372 )]);
373
374 let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config);
375 path.open = true;
377 let mut this = Self {
378 endpoint_config,
379 crypto,
380 handshake_cid: loc_cid,
381 rem_handshake_cid: rem_cid,
382 local_cid_state,
383 paths: BTreeMap::from_iter([(
384 PathId::ZERO,
385 PathState {
386 data: path,
387 prev: None,
388 },
389 )]),
390 path_counter: 0,
391 allow_mtud,
392 local_ip,
393 state,
394 side: connection_side,
395 zero_rtt_enabled: false,
396 zero_rtt_crypto: None,
397 key_phase: false,
398 key_phase_size: rng.random_range(10..1000),
405 peer_params: TransportParameters::default(),
406 orig_rem_cid: rem_cid,
407 initial_dst_cid: init_cid,
408 retry_src_cid: None,
409 events: VecDeque::new(),
410 endpoint_events: VecDeque::new(),
411 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
412 spin: false,
413 spaces: [initial_space, handshake_space, data_space],
414 highest_space: SpaceId::Initial,
415 prev_crypto: None,
416 next_crypto: None,
417 accepted_0rtt: false,
418 permit_idle_reset: true,
419 idle_timeout: match config.max_idle_timeout {
420 None | Some(VarInt(0)) => None,
421 Some(dur) => Some(Duration::from_millis(dur.0)),
422 },
423 timers: TimerTable::default(),
424 authentication_failures: 0,
425 close: false,
426
427 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
428 &TransportParameters::default(),
429 )),
430
431 app_limited: false,
432 receiving_ecn: false,
433 total_authed_packets: 0,
434
435 next_observed_addr_seq_no: 0u32.into(),
436
437 streams: StreamsState::new(
438 side,
439 config.max_concurrent_uni_streams,
440 config.max_concurrent_bidi_streams,
441 config.send_window,
442 config.receive_window,
443 config.stream_receive_window,
444 ),
445 datagrams: DatagramState::default(),
446 config,
447 rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
448 rng,
449 stats: ConnectionStats::default(),
450 path_stats: Default::default(),
451 version,
452
453 max_concurrent_paths: NonZeroU32::MIN,
455 local_max_path_id: PathId::ZERO,
456 remote_max_path_id: PathId::ZERO,
457 max_path_id_with_cids: PathId::ZERO,
458 abandoned_paths: Default::default(),
459
460 iroh_hp: Default::default(),
462 qlog,
463 };
464 if path_validated {
465 this.on_path_validated(PathId::ZERO);
466 }
467 if side.is_client() {
468 this.write_crypto();
470 this.init_0rtt(now);
471 }
472 this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now);
473 this
474 }
475
476 #[must_use]
484 pub fn poll_timeout(&mut self) -> Option<Instant> {
485 self.timers.peek()
486 }
487
488 #[must_use]
494 pub fn poll(&mut self) -> Option<Event> {
495 if let Some(x) = self.events.pop_front() {
496 return Some(x);
497 }
498
499 if let Some(event) = self.streams.poll() {
500 return Some(Event::Stream(event));
501 }
502
503 if let Some(reason) = self.state.take_error() {
504 return Some(Event::ConnectionLost { reason });
505 }
506
507 None
508 }
509
510 #[must_use]
512 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
513 self.endpoint_events.pop_front().map(EndpointEvent)
514 }
515
516 #[must_use]
518 pub fn streams(&mut self) -> Streams<'_> {
519 Streams {
520 state: &mut self.streams,
521 conn_state: &self.state,
522 }
523 }
524
525 #[must_use]
527 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
528 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
529 RecvStream {
530 id,
531 state: &mut self.streams,
532 pending: &mut self.spaces[SpaceId::Data].pending,
533 }
534 }
535
536 #[must_use]
538 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
539 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
540 SendStream {
541 id,
542 state: &mut self.streams,
543 pending: &mut self.spaces[SpaceId::Data].pending,
544 conn_state: &self.state,
545 }
546 }
547
548 pub fn open_path_ensure(
555 &mut self,
556 remote: SocketAddr,
557 initial_status: PathStatus,
558 now: Instant,
559 ) -> Result<(PathId, bool), PathError> {
560 match self
561 .paths
562 .iter()
563 .find(|(_id, path)| path.data.remote == remote)
564 {
565 Some((path_id, _state)) => Ok((*path_id, true)),
566 None => self
567 .open_path(remote, initial_status, now)
568 .map(|id| (id, false)),
569 }
570 }
571
572 pub fn open_path(
577 &mut self,
578 remote: SocketAddr,
579 initial_status: PathStatus,
580 now: Instant,
581 ) -> Result<PathId, PathError> {
582 if !self.is_multipath_negotiated() {
583 return Err(PathError::MultipathNotNegotiated);
584 }
585 if self.side().is_server() {
586 return Err(PathError::ServerSideNotAllowed);
587 }
588
589 let max_abandoned = self.abandoned_paths.iter().max().copied();
590 let max_used = self.paths.keys().last().copied();
591 let path_id = max_abandoned
592 .max(max_used)
593 .unwrap_or(PathId::ZERO)
594 .saturating_add(1u8);
595
596 if Some(path_id) > self.max_path_id() {
597 return Err(PathError::MaxPathIdReached);
598 }
599 if path_id > self.remote_max_path_id {
600 self.spaces[SpaceId::Data].pending.paths_blocked = true;
601 return Err(PathError::MaxPathIdReached);
602 }
603 if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
604 self.spaces[SpaceId::Data]
605 .pending
606 .path_cids_blocked
607 .push(path_id);
608 return Err(PathError::RemoteCidsExhausted);
609 }
610
611 let path = self.ensure_path(path_id, remote, now, None);
612 path.status.local_update(initial_status);
613
614 Ok(path_id)
615 }
616
617 pub fn close_path(
623 &mut self,
624 now: Instant,
625 path_id: PathId,
626 error_code: VarInt,
627 ) -> Result<(), ClosePathError> {
628 if self.abandoned_paths.contains(&path_id)
629 || Some(path_id) > self.max_path_id()
630 || !self.paths.contains_key(&path_id)
631 {
632 return Err(ClosePathError::ClosedPath);
633 }
634 if self
635 .paths
636 .iter()
637 .any(|(id, path)| {
639 *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
640 })
641 .not()
642 {
643 return Err(ClosePathError::LastOpenPath);
644 }
645
646 self.spaces[SpaceId::Data]
648 .pending
649 .path_abandon
650 .insert(path_id, error_code.into());
651
652 let pending_space = &mut self.spaces[SpaceId::Data].pending;
654 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
655 pending_space.path_cids_blocked.retain(|&id| id != path_id);
656 pending_space.path_status.retain(|&id| id != path_id);
657
658 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
660 for sent_packet in space.sent_packets.values_mut() {
661 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
662 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
663 retransmits.path_cids_blocked.retain(|&id| id != path_id);
664 retransmits.path_status.retain(|&id| id != path_id);
665 }
666 }
667 }
668
669 self.rem_cids.remove(&path_id);
675 self.endpoint_events
676 .push_back(EndpointEventInner::RetireResetToken(path_id));
677
678 let end = self.calculate_end_timer(now, 3, SpaceId::Data);
679 let extended_end = self.calculate_end_timer(now, 6, SpaceId::Data);
680 let path = self.paths.get_mut(&path_id).expect("checked above");
681
682 path.data.last_allowed_receive = Some(end);
684 self.abandoned_paths.insert(path_id);
685
686 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
687
688 self.timers.set(
693 Timer::PerPath(path_id, PathTimer::DiscardPath),
694 extended_end,
695 self.qlog.with_time(now),
696 );
697 Ok(())
698 }
699
700 #[track_caller]
704 fn path_data(&self, path_id: PathId) -> &PathData {
705 if let Some(data) = self.paths.get(&path_id) {
706 &data.data
707 } else {
708 panic!(
709 "unknown path: {path_id}, currently known paths: {:?}",
710 self.paths.keys().collect::<Vec<_>>()
711 );
712 }
713 }
714
715 fn path(&self, path_id: PathId) -> Option<&PathData> {
717 self.paths.get(&path_id).map(|path_state| &path_state.data)
718 }
719
720 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
722 self.paths
723 .get_mut(&path_id)
724 .map(|path_state| &mut path_state.data)
725 }
726
727 pub fn paths(&self) -> Vec<PathId> {
731 self.paths.keys().copied().collect()
732 }
733
734 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
736 self.path(path_id)
737 .map(PathData::local_status)
738 .ok_or(ClosedPath { _private: () })
739 }
740
741 pub fn path_remote_address(&self, path_id: PathId) -> Result<SocketAddr, ClosedPath> {
743 self.path(path_id)
744 .map(|path| path.remote)
745 .ok_or(ClosedPath { _private: () })
746 }
747
748 pub fn set_path_status(
752 &mut self,
753 path_id: PathId,
754 status: PathStatus,
755 ) -> Result<PathStatus, SetPathStatusError> {
756 if !self.is_multipath_negotiated() {
757 return Err(SetPathStatusError::MultipathNotNegotiated);
758 }
759 let path = self
760 .path_mut(path_id)
761 .ok_or(SetPathStatusError::ClosedPath)?;
762 let prev = match path.status.local_update(status) {
763 Some(prev) => {
764 self.spaces[SpaceId::Data]
765 .pending
766 .path_status
767 .insert(path_id);
768 prev
769 }
770 None => path.local_status(),
771 };
772 Ok(prev)
773 }
774
775 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
780 self.path(path_id).and_then(|path| path.remote_status())
781 }
782
783 pub fn set_path_max_idle_timeout(
789 &mut self,
790 path_id: PathId,
791 timeout: Option<Duration>,
792 ) -> Result<Option<Duration>, ClosedPath> {
793 let path = self
794 .paths
795 .get_mut(&path_id)
796 .ok_or(ClosedPath { _private: () })?;
797 Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
798 }
799
800 pub fn set_path_keep_alive_interval(
806 &mut self,
807 path_id: PathId,
808 interval: Option<Duration>,
809 ) -> Result<Option<Duration>, ClosedPath> {
810 let path = self
811 .paths
812 .get_mut(&path_id)
813 .ok_or(ClosedPath { _private: () })?;
814 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
815 }
816
817 #[track_caller]
821 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
822 &mut self.paths.get_mut(&path_id).expect("known path").data
823 }
824
825 fn ensure_path(
826 &mut self,
827 path_id: PathId,
828 remote: SocketAddr,
829 now: Instant,
830 pn: Option<u64>,
831 ) -> &mut PathData {
832 let vacant_entry = match self.paths.entry(path_id) {
833 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
834 btree_map::Entry::Occupied(occupied_entry) => {
835 return &mut occupied_entry.into_mut().data;
836 }
837 };
838
839 debug!(%path_id, ?remote, "path added");
842 let peer_max_udp_payload_size =
843 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
844 self.path_counter = self.path_counter.wrapping_add(1);
845 let mut data = PathData::new(
846 remote,
847 self.allow_mtud,
848 Some(peer_max_udp_payload_size),
849 self.path_counter,
850 now,
851 &self.config,
852 );
853
854 let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
855 self.timers.set(
856 Timer::PerPath(path_id, PathTimer::PathOpen),
857 now + 3 * pto,
858 self.qlog.with_time(now),
859 );
860
861 data.send_new_challenge = true;
864
865 let path = vacant_entry.insert(PathState { data, prev: None });
866
867 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
868 if let Some(pn) = pn {
869 pn_space.dedup.insert(pn);
870 }
871 self.spaces[SpaceId::Data]
872 .number_spaces
873 .insert(path_id, pn_space);
874 self.qlog.emit_tuple_assigned(path_id, remote, now);
875 &mut path.data
876 }
877
878 #[must_use]
888 pub fn poll_transmit(
889 &mut self,
890 now: Instant,
891 max_datagrams: usize,
892 buf: &mut Vec<u8>,
893 ) -> Option<Transmit> {
894 if let Some(probing) = self
895 .iroh_hp
896 .server_side_mut()
897 .ok()
898 .and_then(iroh_hp::ServerState::next_probe)
899 {
900 let destination = probing.remote();
901 trace!(%destination, "RAND_DATA packet");
902 let token: u64 = self.rng.random();
903 buf.put_u64(token);
904 probing.finish(token);
905 return Some(Transmit {
906 destination,
907 ecn: None,
908 size: 8,
909 segment_size: None,
910 src_ip: None,
911 });
912 }
913
914 assert!(max_datagrams != 0);
915 let max_datagrams = match self.config.enable_segmentation_offload {
916 false => 1,
917 true => max_datagrams,
918 };
919
920 let close = match self.state.as_type() {
939 StateType::Drained => {
940 self.app_limited = true;
941 return None;
942 }
943 StateType::Draining | StateType::Closed => {
944 if !self.close {
947 self.app_limited = true;
948 return None;
949 }
950 true
951 }
952 _ => false,
953 };
954
955 if let Some(config) = &self.config.ack_frequency_config {
957 let rtt = self
958 .paths
959 .values()
960 .map(|p| p.data.rtt.get())
961 .min()
962 .expect("one path exists");
963 self.spaces[SpaceId::Data].pending.ack_frequency = self
964 .ack_frequency
965 .should_send_ack_frequency(rtt, config, &self.peer_params)
966 && self.highest_space == SpaceId::Data
967 && self.peer_supports_ack_frequency();
968 }
969
970 let mut coalesce = true;
972
973 let mut pad_datagram = PadDatagram::No;
976
977 let mut congestion_blocked = false;
981
982 let mut last_packet_number = None;
984
985 let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
986
987 let have_available_path = self.paths.iter().any(|(id, path)| {
990 path.data.validated
991 && path.data.local_status() == PathStatus::Available
992 && self.rem_cids.contains_key(id)
993 });
994
995 let mut transmit = TransmitBuf::new(
997 buf,
998 max_datagrams,
999 self.path_data(path_id).current_mtu().into(),
1000 );
1001 if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
1002 return Some(challenge);
1003 }
1004 let mut space_id = match path_id {
1005 PathId::ZERO => SpaceId::Initial,
1006 _ => SpaceId::Data,
1007 };
1008
1009 loop {
1010 let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1012 let err = PathError::RemoteCidsExhausted;
1013 if !self.abandoned_paths.contains(&path_id) {
1014 debug!(?err, %path_id, "no active CID for path");
1015 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1016 id: path_id,
1017 error: err,
1018 }));
1019 self.close_path(
1023 now,
1024 path_id,
1025 TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1026 )
1027 .ok();
1028 self.spaces[SpaceId::Data]
1029 .pending
1030 .path_cids_blocked
1031 .push(path_id);
1032 } else {
1033 trace!(%path_id, "remote CIDs retired for abandoned path");
1034 }
1035
1036 match self.paths.keys().find(|&&next| next > path_id) {
1037 Some(next_path_id) => {
1038 path_id = *next_path_id;
1040 space_id = SpaceId::Data;
1041
1042 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1044 if let Some(challenge) =
1045 self.send_prev_path_challenge(now, &mut transmit, path_id)
1046 {
1047 return Some(challenge);
1048 }
1049
1050 continue;
1051 }
1052 None => {
1053 trace!(
1055 ?space_id,
1056 %path_id,
1057 "no CIDs to send on path, no more paths"
1058 );
1059 break;
1060 }
1061 }
1062 };
1063
1064 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1067 transmit.datagram_remaining_mut()
1069 } else {
1070 transmit.segment_size()
1072 };
1073 let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1074 let path_should_send = {
1075 let path_exclusive_only = space_id == SpaceId::Data
1076 && have_available_path
1077 && self.path_data(path_id).local_status() == PathStatus::Backup;
1078 let path_should_send = if path_exclusive_only {
1079 can_send.path_exclusive
1080 } else {
1081 !can_send.is_empty()
1082 };
1083 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1084 path_should_send || needs_loss_probe || can_send.close
1085 };
1086
1087 if !path_should_send && space_id < SpaceId::Data {
1088 if self.spaces[space_id].crypto.is_some() {
1089 trace!(?space_id, %path_id, "nothing to send in space");
1090 }
1091 space_id = space_id.next();
1092 continue;
1093 }
1094
1095 let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1096 self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1098 } else {
1099 PathBlocked::No
1100 };
1101 if send_blocked != PathBlocked::No {
1102 trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1103 congestion_blocked = true;
1104 }
1105 if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1106 space_id = space_id.next();
1109 continue;
1110 }
1111 if !path_should_send || send_blocked != PathBlocked::No {
1112 if transmit.num_datagrams() > 0 {
1117 break;
1118 }
1119
1120 match self.paths.keys().find(|&&next| next > path_id) {
1121 Some(next_path_id) => {
1122 trace!(
1124 ?space_id,
1125 %path_id,
1126 %next_path_id,
1127 "nothing to send on path"
1128 );
1129 path_id = *next_path_id;
1130 space_id = SpaceId::Data;
1131
1132 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1134 if let Some(challenge) =
1135 self.send_prev_path_challenge(now, &mut transmit, path_id)
1136 {
1137 return Some(challenge);
1138 }
1139
1140 continue;
1141 }
1142 None => {
1143 trace!(
1145 ?space_id,
1146 %path_id,
1147 next_path_id=?None::<PathId>,
1148 "nothing to send on path"
1149 );
1150 break;
1151 }
1152 }
1153 }
1154
1155 if transmit.datagram_remaining_mut() == 0 {
1157 if transmit.num_datagrams() >= transmit.max_datagrams() {
1158 break;
1160 }
1161
1162 match self.spaces[space_id].for_path(path_id).loss_probes {
1163 0 => transmit.start_new_datagram(),
1164 _ => {
1165 let request_immediate_ack =
1167 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1168 self.spaces[space_id].maybe_queue_probe(
1169 path_id,
1170 request_immediate_ack,
1171 &self.streams,
1172 );
1173
1174 self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1175
1176 transmit.start_new_datagram_with_size(std::cmp::min(
1180 usize::from(INITIAL_MTU),
1181 transmit.segment_size(),
1182 ));
1183 }
1184 }
1185 trace!(count = transmit.num_datagrams(), "new datagram started");
1186 coalesce = true;
1187 pad_datagram = PadDatagram::No;
1188 }
1189
1190 if transmit.datagram_start_offset() < transmit.len() {
1193 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1194 }
1195
1196 if self.spaces[SpaceId::Initial].crypto.is_some()
1201 && space_id == SpaceId::Handshake
1202 && self.side.is_client()
1203 {
1204 self.discard_space(now, SpaceId::Initial);
1207 }
1208 if let Some(ref mut prev) = self.prev_crypto {
1209 prev.update_unacked = false;
1210 }
1211
1212 let mut qlog = QlogSentPacket::default();
1213 let mut builder = PacketBuilder::new(
1214 now,
1215 space_id,
1216 path_id,
1217 remote_cid,
1218 &mut transmit,
1219 can_send.other,
1220 self,
1221 &mut qlog,
1222 )?;
1223 last_packet_number = Some(builder.exact_number);
1224 coalesce = coalesce && !builder.short_header;
1225
1226 if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1227 pad_datagram |= PadDatagram::ToMinMtu;
1229 }
1230 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1231 pad_datagram |= PadDatagram::ToSegmentSize;
1232 }
1233
1234 if can_send.close {
1235 trace!("sending CONNECTION_CLOSE");
1236 let mut sent_frames = SentFrames::default();
1241 let is_multipath_negotiated = self.is_multipath_negotiated();
1242 for path_id in self.spaces[space_id]
1243 .number_spaces
1244 .iter()
1245 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1246 .map(|(&path_id, _)| path_id)
1247 .collect::<Vec<_>>()
1248 {
1249 Self::populate_acks(
1250 now,
1251 self.receiving_ecn,
1252 &mut sent_frames,
1253 path_id,
1254 space_id,
1255 &mut self.spaces[space_id],
1256 is_multipath_negotiated,
1257 &mut builder.frame_space_mut(),
1258 &mut self.stats,
1259 &mut qlog,
1260 );
1261 }
1262
1263 debug_assert!(
1267 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1268 "ACKs should leave space for ConnectionClose"
1269 );
1270 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1271 let max_frame_size = builder.frame_space_remaining();
1272 match self.state.as_type() {
1273 StateType::Closed => {
1274 let reason: Close =
1275 self.state.as_closed().expect("checked").clone().into();
1276 if space_id == SpaceId::Data || reason.is_transport_layer() {
1277 reason.encode(&mut builder.frame_space_mut(), max_frame_size);
1278 qlog.frame(&Frame::Close(reason));
1279 } else {
1280 let frame = frame::ConnectionClose {
1281 error_code: TransportErrorCode::APPLICATION_ERROR,
1282 frame_type: None,
1283 reason: Bytes::new(),
1284 };
1285 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1286 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1287 }
1288 }
1289 StateType::Draining => {
1290 let frame = frame::ConnectionClose {
1291 error_code: TransportErrorCode::NO_ERROR,
1292 frame_type: None,
1293 reason: Bytes::new(),
1294 };
1295 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1296 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1297 }
1298 _ => unreachable!(
1299 "tried to make a close packet when the connection wasn't closed"
1300 ),
1301 };
1302 }
1303 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1304 if space_id == self.highest_space {
1305 self.close = false;
1308 break;
1310 } else {
1311 space_id = space_id.next();
1315 continue;
1316 }
1317 }
1318
1319 if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1322 let path = self.path_data_mut(path_id);
1323 if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) {
1324 let response = frame::PathResponse(token);
1328 trace!(%response, "(off-path)");
1329 builder.frame_space_mut().write(response);
1330 qlog.frame(&Frame::PathResponse(response));
1331 self.stats.frame_tx.path_response += 1;
1332 builder.finish_and_track(
1333 now,
1334 self,
1335 path_id,
1336 SentFrames {
1337 non_retransmits: true,
1338 ..SentFrames::default()
1339 },
1340 PadDatagram::ToMinMtu,
1341 qlog,
1342 );
1343 self.stats.udp_tx.on_sent(1, transmit.len());
1344 return Some(Transmit {
1345 destination: remote,
1346 size: transmit.len(),
1347 ecn: None,
1348 segment_size: None,
1349 src_ip: self.local_ip,
1350 });
1351 }
1352 }
1353
1354 let sent_frames = {
1355 let path_exclusive_only = have_available_path
1356 && self.path_data(path_id).local_status() == PathStatus::Backup;
1357 let pn = builder.exact_number;
1358 self.populate_packet(
1359 now,
1360 space_id,
1361 path_id,
1362 path_exclusive_only,
1363 &mut builder.frame_space_mut(),
1364 pn,
1365 &mut qlog,
1366 )
1367 };
1368
1369 debug_assert!(
1376 !(sent_frames.is_ack_only(&self.streams)
1377 && !can_send.acks
1378 && can_send.other
1379 && builder.buf.segment_size()
1380 == self.path_data(path_id).current_mtu() as usize
1381 && self.datagrams.outgoing.is_empty()),
1382 "SendableFrames was {can_send:?}, but only ACKs have been written"
1383 );
1384 if sent_frames.requires_padding {
1385 pad_datagram |= PadDatagram::ToMinMtu;
1386 }
1387
1388 for (path_id, _pn) in sent_frames.largest_acked.iter() {
1389 self.spaces[space_id]
1390 .for_path(*path_id)
1391 .pending_acks
1392 .acks_sent();
1393 self.timers.stop(
1394 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1395 self.qlog.with_time(now),
1396 );
1397 }
1398
1399 if coalesce
1407 && builder
1408 .buf
1409 .datagram_remaining_mut()
1410 .saturating_sub(builder.predict_packet_end())
1411 > MIN_PACKET_SPACE
1412 && self
1413 .next_send_space(space_id, path_id, builder.buf, close)
1414 .is_some()
1415 {
1416 builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog);
1419 } else {
1420 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1423 const MAX_PADDING: usize = 32;
1431 if builder.buf.datagram_remaining_mut()
1432 > builder.predict_packet_end() + MAX_PADDING
1433 {
1434 trace!(
1435 "GSO truncated by demand for {} padding bytes",
1436 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1437 );
1438 builder.finish_and_track(
1439 now,
1440 self,
1441 path_id,
1442 sent_frames,
1443 PadDatagram::No,
1444 qlog,
1445 );
1446 break;
1447 }
1448
1449 builder.finish_and_track(
1452 now,
1453 self,
1454 path_id,
1455 sent_frames,
1456 PadDatagram::ToSegmentSize,
1457 qlog,
1458 );
1459 } else {
1460 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1461 }
1462 if transmit.num_datagrams() == 1 {
1463 transmit.clip_datagram_size();
1464 }
1465 }
1466 }
1467
1468 if let Some(last_packet_number) = last_packet_number {
1469 self.path_data_mut(path_id).congestion.on_sent(
1472 now,
1473 transmit.len() as u64,
1474 last_packet_number,
1475 );
1476 }
1477
1478 self.qlog.emit_recovery_metrics(
1479 path_id,
1480 &mut self.paths.get_mut(&path_id).unwrap().data,
1481 now,
1482 );
1483
1484 self.app_limited = transmit.is_empty() && !congestion_blocked;
1485
1486 if transmit.is_empty() && self.state.is_established() {
1488 let space_id = SpaceId::Data;
1490 path_id = *self.paths.first_key_value().expect("one path must exist").0;
1491 let probe_data = loop {
1492 let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1498 let eligible = self.path_data(path_id).validated
1499 && !self.path_data(path_id).is_validating_path()
1500 && !self.abandoned_paths.contains(&path_id);
1501 let probe_size = eligible
1502 .then(|| {
1503 let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1504 self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1505 })
1506 .flatten();
1507 match (active_cid, probe_size) {
1508 (Some(active_cid), Some(probe_size)) => {
1509 break Some((active_cid, probe_size));
1511 }
1512 _ => {
1513 match self.paths.keys().find(|&&next| next > path_id) {
1515 Some(next) => {
1516 path_id = *next;
1517 continue;
1518 }
1519 None => break None,
1520 }
1521 }
1522 }
1523 };
1524 if let Some((active_cid, probe_size)) = probe_data {
1525 debug_assert_eq!(transmit.num_datagrams(), 0);
1527 transmit.start_new_datagram_with_size(probe_size as usize);
1528
1529 let mut qlog = QlogSentPacket::default();
1530 let mut builder = PacketBuilder::new(
1531 now,
1532 space_id,
1533 path_id,
1534 active_cid,
1535 &mut transmit,
1536 true,
1537 self,
1538 &mut qlog,
1539 )?;
1540
1541 trace!(?probe_size, "writing MTUD probe");
1543 trace!("PING");
1544 builder.frame_space_mut().write(frame::FrameType::PING);
1545 qlog.frame(&Frame::Ping);
1546 self.stats.frame_tx.ping += 1;
1547
1548 if self.peer_supports_ack_frequency() {
1550 trace!("IMMEDIATE_ACK");
1551 builder
1552 .frame_space_mut()
1553 .write(frame::FrameType::IMMEDIATE_ACK);
1554 self.stats.frame_tx.immediate_ack += 1;
1555 qlog.frame(&Frame::ImmediateAck);
1556 }
1557
1558 let sent_frames = SentFrames {
1559 non_retransmits: true,
1560 ..Default::default()
1561 };
1562 builder.finish_and_track(
1563 now,
1564 self,
1565 path_id,
1566 sent_frames,
1567 PadDatagram::ToSize(probe_size),
1568 qlog,
1569 );
1570
1571 self.path_stats
1572 .entry(path_id)
1573 .or_default()
1574 .sent_plpmtud_probes += 1;
1575 }
1576 }
1577
1578 if transmit.is_empty() {
1579 return None;
1580 }
1581
1582 let destination = self.path_data(path_id).remote;
1583 trace!(
1584 segment_size = transmit.segment_size(),
1585 last_datagram_len = transmit.len() % transmit.segment_size(),
1586 ?destination,
1587 "sending {} bytes in {} datagrams",
1588 transmit.len(),
1589 transmit.num_datagrams()
1590 );
1591 self.path_data_mut(path_id)
1592 .inc_total_sent(transmit.len() as u64);
1593
1594 self.stats
1595 .udp_tx
1596 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1597
1598 Some(Transmit {
1599 destination,
1600 size: transmit.len(),
1601 ecn: if self.path_data(path_id).sending_ecn {
1602 Some(EcnCodepoint::Ect0)
1603 } else {
1604 None
1605 },
1606 segment_size: match transmit.num_datagrams() {
1607 1 => None,
1608 _ => Some(transmit.segment_size()),
1609 },
1610 src_ip: self.local_ip,
1611 })
1612 }
1613
1614 fn next_send_space(
1619 &mut self,
1620 current_space_id: SpaceId,
1621 path_id: PathId,
1622 buf: &TransmitBuf<'_>,
1623 close: bool,
1624 ) -> Option<SpaceId> {
1625 let mut space_id = current_space_id;
1632 loop {
1633 let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1634 if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1635 return Some(space_id);
1636 }
1637 space_id = match space_id {
1638 SpaceId::Initial => SpaceId::Handshake,
1639 SpaceId::Handshake => SpaceId::Data,
1640 SpaceId::Data => break,
1641 }
1642 }
1643 None
1644 }
1645
1646 fn path_congestion_check(
1648 &mut self,
1649 space_id: SpaceId,
1650 path_id: PathId,
1651 transmit: &TransmitBuf<'_>,
1652 can_send: &SendableFrames,
1653 now: Instant,
1654 ) -> PathBlocked {
1655 if self.side().is_server()
1661 && self
1662 .path_data(path_id)
1663 .anti_amplification_blocked(transmit.len() as u64 + 1)
1664 {
1665 trace!(?space_id, %path_id, "blocked by anti-amplification");
1666 return PathBlocked::AntiAmplification;
1667 }
1668
1669 let bytes_to_send = transmit.segment_size() as u64;
1672 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1673
1674 if can_send.other && !need_loss_probe && !can_send.close {
1675 let path = self.path_data(path_id);
1676 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1677 trace!(?space_id, %path_id, "blocked by congestion control");
1678 return PathBlocked::Congestion;
1679 }
1680 }
1681
1682 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1684 self.timers.set(
1685 Timer::PerPath(path_id, PathTimer::Pacing),
1686 delay,
1687 self.qlog.with_time(now),
1688 );
1689 trace!(?space_id, %path_id, "blocked by pacing");
1692 return PathBlocked::Pacing;
1693 }
1694
1695 PathBlocked::No
1696 }
1697
1698 fn send_prev_path_challenge(
1703 &mut self,
1704 now: Instant,
1705 buf: &mut TransmitBuf<'_>,
1706 path_id: PathId,
1707 ) -> Option<Transmit> {
1708 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1709 if !prev_path.send_new_challenge {
1712 return None;
1713 };
1714 prev_path.send_new_challenge = false;
1715 let destination = prev_path.remote;
1716 let token = self.rng.random();
1717 let info = paths::SentChallengeInfo {
1718 sent_instant: now,
1719 remote: destination,
1720 };
1721 prev_path.challenges_sent.insert(token, info);
1722 debug_assert_eq!(
1723 self.highest_space,
1724 SpaceId::Data,
1725 "PATH_CHALLENGE queued without 1-RTT keys"
1726 );
1727 buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1728
1729 debug_assert_eq!(buf.datagram_start_offset(), 0);
1735 let mut qlog = QlogSentPacket::default();
1736 let mut builder = PacketBuilder::new(
1737 now,
1738 SpaceId::Data,
1739 path_id,
1740 *prev_cid,
1741 buf,
1742 false,
1743 self,
1744 &mut qlog,
1745 )?;
1746 let challenge = frame::PathChallenge(token);
1747 trace!(%challenge, "validating previous path");
1748 qlog.frame(&Frame::PathChallenge(challenge));
1749 builder.frame_space_mut().write(challenge);
1750 self.stats.frame_tx.path_challenge += 1;
1751
1752 builder.pad_to(MIN_INITIAL_SIZE);
1757
1758 builder.finish(self, now, qlog);
1759 self.stats.udp_tx.on_sent(1, buf.len());
1760
1761 Some(Transmit {
1762 destination,
1763 size: buf.len(),
1764 ecn: None,
1765 segment_size: None,
1766 src_ip: self.local_ip,
1767 })
1768 }
1769
1770 fn space_can_send(
1775 &mut self,
1776 space_id: SpaceId,
1777 path_id: PathId,
1778 packet_size: usize,
1779 close: bool,
1780 ) -> SendableFrames {
1781 let pn = self.spaces[SpaceId::Data]
1782 .for_path(path_id)
1783 .peek_tx_number();
1784 let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1785 if self.spaces[space_id].crypto.is_none()
1786 && (space_id != SpaceId::Data
1787 || self.zero_rtt_crypto.is_none()
1788 || self.side.is_server())
1789 {
1790 return SendableFrames::empty();
1792 }
1793 let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1794 if space_id == SpaceId::Data {
1795 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1796 }
1797
1798 can_send.close = close && self.spaces[space_id].crypto.is_some();
1799
1800 can_send
1801 }
1802
1803 pub fn handle_event(&mut self, event: ConnectionEvent) {
1809 use ConnectionEventInner::*;
1810 match event.0 {
1811 Datagram(DatagramConnectionEvent {
1812 now,
1813 remote,
1814 path_id,
1815 ecn,
1816 first_decode,
1817 remaining,
1818 }) => {
1819 let span = trace_span!("pkt", %path_id);
1820 let _guard = span.enter();
1821 if let Some(known_remote) = self.path(path_id).map(|path| path.remote) {
1825 if remote != known_remote && !self.side.remote_may_migrate(&self.state) {
1826 trace!(
1827 %path_id,
1828 ?remote,
1829 path_remote = ?self.path(path_id).map(|p| p.remote),
1830 "discarding packet from unrecognized peer"
1831 );
1832 return;
1833 }
1834 }
1835
1836 let was_anti_amplification_blocked = self
1837 .path(path_id)
1838 .map(|path| path.anti_amplification_blocked(1))
1839 .unwrap_or(true); self.stats.udp_rx.datagrams += 1;
1843 self.stats.udp_rx.bytes += first_decode.len() as u64;
1844 let data_len = first_decode.len();
1845
1846 self.handle_decode(now, remote, path_id, ecn, first_decode);
1847 if let Some(path) = self.path_mut(path_id) {
1852 path.inc_total_recvd(data_len as u64);
1853 }
1854
1855 if let Some(data) = remaining {
1856 self.stats.udp_rx.bytes += data.len() as u64;
1857 self.handle_coalesced(now, remote, path_id, ecn, data);
1858 }
1859
1860 if let Some(path) = self.paths.get_mut(&path_id) {
1861 self.qlog
1862 .emit_recovery_metrics(path_id, &mut path.data, now);
1863 }
1864
1865 if was_anti_amplification_blocked {
1866 self.set_loss_detection_timer(now, path_id);
1870 }
1871 }
1872 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1873 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1874 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1875 let cid_state = self
1876 .local_cid_state
1877 .entry(path_id)
1878 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1879 cid_state.new_cids(&ids, now);
1880
1881 ids.into_iter().rev().for_each(|frame| {
1882 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1883 });
1884 self.reset_cid_retirement(now);
1886 }
1887 }
1888 }
1889
1890 pub fn handle_timeout(&mut self, now: Instant) {
1900 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1901 trace!(?timer, at=?now, "timeout");
1903 match timer {
1904 Timer::Conn(timer) => match timer {
1905 ConnTimer::Close => {
1906 self.state.move_to_drained(None);
1907 self.endpoint_events.push_back(EndpointEventInner::Drained);
1908 }
1909 ConnTimer::Idle => {
1910 self.kill(ConnectionError::TimedOut);
1911 }
1912 ConnTimer::KeepAlive => {
1913 trace!("sending keep-alive");
1914 self.ping();
1915 }
1916 ConnTimer::KeyDiscard => {
1917 self.zero_rtt_crypto = None;
1918 self.prev_crypto = None;
1919 }
1920 ConnTimer::PushNewCid => {
1921 while let Some((path_id, when)) = self.next_cid_retirement() {
1922 if when > now {
1923 break;
1924 }
1925 match self.local_cid_state.get_mut(&path_id) {
1926 None => error!(%path_id, "No local CID state for path"),
1927 Some(cid_state) => {
1928 let num_new_cid = cid_state.on_cid_timeout().into();
1930 if !self.state.is_closed() {
1931 trace!(
1932 "push a new CID to peer RETIRE_PRIOR_TO field {}",
1933 cid_state.retire_prior_to()
1934 );
1935 self.endpoint_events.push_back(
1936 EndpointEventInner::NeedIdentifiers(
1937 path_id,
1938 now,
1939 num_new_cid,
1940 ),
1941 );
1942 }
1943 }
1944 }
1945 }
1946 }
1947 },
1948 Timer::PerPath(path_id, timer) => {
1950 let span = trace_span!("per-path timer fired", %path_id, ?timer);
1951 let _guard = span.enter();
1952 match timer {
1953 PathTimer::PathIdle => {
1954 self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1955 .ok();
1956 }
1957
1958 PathTimer::PathKeepAlive => {
1959 trace!("sending keep-alive on path");
1960 self.ping_path(path_id).ok();
1961 }
1962 PathTimer::LossDetection => {
1963 self.on_loss_detection_timeout(now, path_id);
1964 self.qlog.emit_recovery_metrics(
1965 path_id,
1966 &mut self.paths.get_mut(&path_id).unwrap().data,
1967 now,
1968 );
1969 }
1970 PathTimer::PathValidation => {
1971 let Some(path) = self.paths.get_mut(&path_id) else {
1972 continue;
1973 };
1974 self.timers.stop(
1975 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1976 self.qlog.with_time(now),
1977 );
1978 debug!("path validation failed");
1979 if let Some((_, prev)) = path.prev.take() {
1980 path.data = prev;
1981 }
1982 path.data.challenges_sent.clear();
1983 path.data.send_new_challenge = false;
1984 }
1985 PathTimer::PathChallengeLost => {
1986 let Some(path) = self.paths.get_mut(&path_id) else {
1987 continue;
1988 };
1989 trace!("path challenge deemed lost");
1990 path.data.send_new_challenge = true;
1991 }
1992 PathTimer::PathOpen => {
1993 let Some(path) = self.path_mut(path_id) else {
1994 continue;
1995 };
1996 path.challenges_sent.clear();
1997 path.send_new_challenge = false;
1998 debug!("new path validation failed");
1999 if let Err(err) = self.close_path(
2000 now,
2001 path_id,
2002 TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2003 ) {
2004 warn!(?err, "failed closing path");
2005 }
2006
2007 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2008 id: path_id,
2009 error: PathError::ValidationFailed,
2010 }));
2011 }
2012 PathTimer::Pacing => trace!("pacing timer expired"),
2013 PathTimer::MaxAckDelay => {
2014 trace!("max ack delay reached");
2015 self.spaces[SpaceId::Data]
2017 .for_path(path_id)
2018 .pending_acks
2019 .on_max_ack_delay_timeout()
2020 }
2021 PathTimer::DiscardPath => {
2022 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2025 if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2026 let (min_seq, max_seq) = loc_cid_state.active_seq();
2027 for seq in min_seq..=max_seq {
2028 self.endpoint_events.push_back(
2029 EndpointEventInner::RetireConnectionId(
2030 now, path_id, seq, false,
2031 ),
2032 );
2033 }
2034 }
2035 self.discard_path(path_id, now);
2036 }
2037 }
2038 }
2039 }
2040 }
2041 }
2042
2043 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2055 self.close_inner(
2056 now,
2057 Close::Application(frame::ApplicationClose { error_code, reason }),
2058 )
2059 }
2060
2061 fn close_inner(&mut self, now: Instant, reason: Close) {
2062 let was_closed = self.state.is_closed();
2063 if !was_closed {
2064 self.close_common();
2065 self.set_close_timer(now);
2066 self.close = true;
2067 self.state.move_to_closed_local(reason);
2068 }
2069 }
2070
2071 pub fn datagrams(&mut self) -> Datagrams<'_> {
2073 Datagrams { conn: self }
2074 }
2075
2076 pub fn stats(&mut self) -> ConnectionStats {
2078 self.stats.clone()
2079 }
2080
2081 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2083 let path = self.paths.get(&path_id)?;
2084 let stats = self.path_stats.entry(path_id).or_default();
2085 stats.rtt = path.data.rtt.get();
2086 stats.cwnd = path.data.congestion.window();
2087 stats.current_mtu = path.data.mtud.current_mtu();
2088 Some(*stats)
2089 }
2090
2091 pub fn ping(&mut self) {
2095 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2098 path_data.ping_pending = true;
2099 }
2100 }
2101
2102 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2106 let path_data = self.spaces[self.highest_space]
2107 .number_spaces
2108 .get_mut(&path)
2109 .ok_or(ClosedPath { _private: () })?;
2110 path_data.ping_pending = true;
2111 Ok(())
2112 }
2113
2114 pub fn force_key_update(&mut self) {
2118 if !self.state.is_established() {
2119 debug!("ignoring forced key update in illegal state");
2120 return;
2121 }
2122 if self.prev_crypto.is_some() {
2123 debug!("ignoring redundant forced key update");
2126 return;
2127 }
2128 self.update_keys(None, false);
2129 }
2130
2131 #[doc(hidden)]
2133 #[deprecated]
2134 pub fn initiate_key_update(&mut self) {
2135 self.force_key_update();
2136 }
2137
2138 pub fn crypto_session(&self) -> &dyn crypto::Session {
2140 &*self.crypto
2141 }
2142
2143 pub fn is_handshaking(&self) -> bool {
2148 self.state.is_handshake()
2149 }
2150
2151 pub fn is_closed(&self) -> bool {
2159 self.state.is_closed()
2160 }
2161
2162 pub fn is_drained(&self) -> bool {
2167 self.state.is_drained()
2168 }
2169
2170 pub fn accepted_0rtt(&self) -> bool {
2174 self.accepted_0rtt
2175 }
2176
2177 pub fn has_0rtt(&self) -> bool {
2179 self.zero_rtt_enabled
2180 }
2181
2182 pub fn has_pending_retransmits(&self) -> bool {
2184 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2185 }
2186
2187 pub fn side(&self) -> Side {
2189 self.side.side()
2190 }
2191
2192 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2194 self.path(path_id)
2195 .map(|path_data| {
2196 path_data
2197 .last_observed_addr_report
2198 .as_ref()
2199 .map(|observed| observed.socket_addr())
2200 })
2201 .ok_or(ClosedPath { _private: () })
2202 }
2203
2204 pub fn local_ip(&self) -> Option<IpAddr> {
2214 self.local_ip
2215 }
2216
2217 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2219 self.path(path_id).map(|d| d.rtt.get())
2220 }
2221
2222 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2224 self.path(path_id).map(|d| d.congestion.as_ref())
2225 }
2226
2227 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2232 self.streams.set_max_concurrent(dir, count);
2233 let pending = &mut self.spaces[SpaceId::Data].pending;
2236 self.streams.queue_max_stream_id(pending);
2237 }
2238
2239 pub fn set_max_concurrent_paths(
2249 &mut self,
2250 now: Instant,
2251 count: NonZeroU32,
2252 ) -> Result<(), MultipathNotNegotiated> {
2253 if !self.is_multipath_negotiated() {
2254 return Err(MultipathNotNegotiated { _private: () });
2255 }
2256 self.max_concurrent_paths = count;
2257
2258 let in_use_count = self
2259 .local_max_path_id
2260 .next()
2261 .saturating_sub(self.abandoned_paths.len() as u32)
2262 .as_u32();
2263 let extra_needed = count.get().saturating_sub(in_use_count);
2264 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2265
2266 self.set_max_path_id(now, new_max_path_id);
2267
2268 Ok(())
2269 }
2270
2271 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2273 if max_path_id <= self.local_max_path_id {
2274 return;
2275 }
2276
2277 self.local_max_path_id = max_path_id;
2278 self.spaces[SpaceId::Data].pending.max_path_id = true;
2279
2280 self.issue_first_path_cids(now);
2281 }
2282
2283 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2289 self.streams.max_concurrent(dir)
2290 }
2291
2292 pub fn set_send_window(&mut self, send_window: u64) {
2294 self.streams.set_send_window(send_window);
2295 }
2296
2297 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2299 if self.streams.set_receive_window(receive_window) {
2300 self.spaces[SpaceId::Data].pending.max_data = true;
2301 }
2302 }
2303
2304 pub fn is_multipath_negotiated(&self) -> bool {
2309 !self.is_handshaking()
2310 && self.config.max_concurrent_multipath_paths.is_some()
2311 && self.peer_params.initial_max_path_id.is_some()
2312 }
2313
2314 fn on_ack_received(
2315 &mut self,
2316 now: Instant,
2317 space: SpaceId,
2318 ack: frame::Ack,
2319 ) -> Result<(), TransportError> {
2320 let path = PathId::ZERO;
2322 self.inner_on_ack_received(now, space, path, ack)
2323 }
2324
2325 fn on_path_ack_received(
2326 &mut self,
2327 now: Instant,
2328 space: SpaceId,
2329 path_ack: frame::PathAck,
2330 ) -> Result<(), TransportError> {
2331 let (ack, path) = path_ack.into_ack();
2332 self.inner_on_ack_received(now, space, path, ack)
2333 }
2334
2335 fn inner_on_ack_received(
2337 &mut self,
2338 now: Instant,
2339 space: SpaceId,
2340 path: PathId,
2341 ack: frame::Ack,
2342 ) -> Result<(), TransportError> {
2343 if self.abandoned_paths.contains(&path) {
2344 trace!("silently ignoring PATH_ACK on abandoned path");
2347 return Ok(());
2348 }
2349 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2350 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2351 }
2352 let new_largest = {
2353 let space = &mut self.spaces[space].for_path(path);
2354 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2355 space.largest_acked_packet = Some(ack.largest);
2356 if let Some(info) = space.sent_packets.get(ack.largest) {
2357 space.largest_acked_packet_sent = info.time_sent;
2361 }
2362 true
2363 } else {
2364 false
2365 }
2366 };
2367
2368 if self.detect_spurious_loss(&ack, space, path) {
2369 self.path_data_mut(path)
2370 .congestion
2371 .on_spurious_congestion_event();
2372 }
2373
2374 let mut newly_acked = ArrayRangeSet::new();
2376 for range in ack.iter() {
2377 self.spaces[space].for_path(path).check_ack(range.clone())?;
2378 for (pn, _) in self.spaces[space]
2379 .for_path(path)
2380 .sent_packets
2381 .iter_range(range)
2382 {
2383 newly_acked.insert_one(pn);
2384 }
2385 }
2386
2387 if newly_acked.is_empty() {
2388 return Ok(());
2389 }
2390
2391 let mut ack_eliciting_acked = false;
2392 for packet in newly_acked.elts() {
2393 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2394 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2395 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2401 pns.pending_acks.subtract_below(*acked_pn);
2402 }
2403 }
2404 ack_eliciting_acked |= info.ack_eliciting;
2405
2406 let path_data = self.path_data_mut(path);
2408 let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2409 if mtu_updated {
2410 path_data
2411 .congestion
2412 .on_mtu_update(path_data.mtud.current_mtu());
2413 }
2414
2415 self.ack_frequency.on_acked(path, packet);
2417
2418 self.on_packet_acked(now, path, info);
2419 }
2420 }
2421
2422 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2423 let app_limited = self.app_limited;
2424 let path_data = self.path_data_mut(path);
2425 let in_flight = path_data.in_flight.bytes;
2426
2427 path_data
2428 .congestion
2429 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2430
2431 if new_largest && ack_eliciting_acked {
2432 let ack_delay = if space != SpaceId::Data {
2433 Duration::from_micros(0)
2434 } else {
2435 cmp::min(
2436 self.ack_frequency.peer_max_ack_delay,
2437 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2438 )
2439 };
2440 let rtt = now.saturating_duration_since(
2441 self.spaces[space].for_path(path).largest_acked_packet_sent,
2442 );
2443
2444 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2445 let path_data = self.path_data_mut(path);
2446 path_data.rtt.update(ack_delay, rtt);
2448 if path_data.first_packet_after_rtt_sample.is_none() {
2449 path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2450 }
2451 }
2452
2453 self.detect_lost_packets(now, space, path, true);
2455
2456 if self.peer_completed_address_validation(path) {
2457 self.path_data_mut(path).pto_count = 0;
2458 }
2459
2460 if self.path_data(path).sending_ecn {
2465 if let Some(ecn) = ack.ecn {
2466 if new_largest {
2471 let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2472 self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2473 }
2474 } else {
2475 debug!("ECN not acknowledged by peer");
2477 self.path_data_mut(path).sending_ecn = false;
2478 }
2479 }
2480
2481 self.set_loss_detection_timer(now, path);
2482 Ok(())
2483 }
2484
2485 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2486 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2487
2488 if lost_packets.is_empty() {
2489 return false;
2490 }
2491
2492 for range in ack.iter() {
2493 let spurious_losses: Vec<u64> = lost_packets
2494 .iter_range(range.clone())
2495 .map(|(pn, _info)| pn)
2496 .collect();
2497
2498 for pn in spurious_losses {
2499 lost_packets.remove(pn);
2500 }
2501 }
2502
2503 lost_packets.is_empty()
2508 }
2509
2510 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2515 let two_pto = 2 * self.path_data(path).rtt.pto_base();
2516
2517 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2518 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2519 }
2520
2521 fn process_ecn(
2523 &mut self,
2524 now: Instant,
2525 space: SpaceId,
2526 path: PathId,
2527 newly_acked: u64,
2528 ecn: frame::EcnCounts,
2529 largest_sent_time: Instant,
2530 ) {
2531 match self.spaces[space]
2532 .for_path(path)
2533 .detect_ecn(newly_acked, ecn)
2534 {
2535 Err(e) => {
2536 debug!("halting ECN due to verification failure: {}", e);
2537
2538 self.path_data_mut(path).sending_ecn = false;
2539 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2542 }
2543 Ok(false) => {}
2544 Ok(true) => {
2545 self.path_stats.entry(path).or_default().congestion_events += 1;
2546 self.path_data_mut(path).congestion.on_congestion_event(
2547 now,
2548 largest_sent_time,
2549 false,
2550 true,
2551 0,
2552 );
2553 }
2554 }
2555 }
2556
2557 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2560 self.paths
2561 .get_mut(&path_id)
2562 .expect("known path")
2563 .remove_in_flight(&info);
2564 let app_limited = self.app_limited;
2565 let path = self.path_data_mut(path_id);
2566 if info.ack_eliciting && !path.is_validating_path() {
2567 let rtt = path.rtt;
2570 path.congestion
2571 .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2572 }
2573
2574 if let Some(retransmits) = info.retransmits.get() {
2576 for (id, _) in retransmits.reset_stream.iter() {
2577 self.streams.reset_acked(*id);
2578 }
2579 }
2580
2581 for frame in info.stream_frames {
2582 self.streams.received_ack_of(frame);
2583 }
2584 }
2585
2586 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2587 let start = if self.zero_rtt_crypto.is_some() {
2588 now
2589 } else {
2590 self.prev_crypto
2591 .as_ref()
2592 .expect("no previous keys")
2593 .end_packet
2594 .as_ref()
2595 .expect("update not acknowledged yet")
2596 .1
2597 };
2598
2599 let end = self.calculate_end_timer(start, 3, space);
2601 self.timers.set(
2602 Timer::Conn(ConnTimer::KeyDiscard),
2603 end,
2604 self.qlog.with_time(now),
2605 );
2606 }
2607
2608 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2621 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2622 self.detect_lost_packets(now, pn_space, path_id, false);
2624 self.set_loss_detection_timer(now, path_id);
2625 return;
2626 }
2627
2628 let (_, space) = match self.pto_time_and_space(now, path_id) {
2629 Some(x) => x,
2630 None => {
2631 error!(%path_id, "PTO expired while unset");
2632 return;
2633 }
2634 };
2635 trace!(
2636 in_flight = self.path_data(path_id).in_flight.bytes,
2637 count = self.path_data(path_id).pto_count,
2638 ?space,
2639 %path_id,
2640 "PTO fired"
2641 );
2642
2643 let count = match self.path_data(path_id).in_flight.ack_eliciting {
2644 0 => {
2647 debug_assert!(!self.peer_completed_address_validation(path_id));
2648 1
2649 }
2650 _ => 2,
2652 };
2653 let pns = self.spaces[space].for_path(path_id);
2654 pns.loss_probes = pns.loss_probes.saturating_add(count);
2655 let path_data = self.path_data_mut(path_id);
2656 path_data.pto_count = path_data.pto_count.saturating_add(1);
2657 self.set_loss_detection_timer(now, path_id);
2658 }
2659
2660 fn detect_lost_packets(
2677 &mut self,
2678 now: Instant,
2679 pn_space: SpaceId,
2680 path_id: PathId,
2681 due_to_ack: bool,
2682 ) {
2683 let mut lost_packets = Vec::<u64>::new();
2684 let mut lost_mtu_probe = None;
2685 let mut in_persistent_congestion = false;
2686 let mut size_of_lost_packets = 0u64;
2687 self.spaces[pn_space].for_path(path_id).loss_time = None;
2688
2689 let path = self.path_data(path_id);
2692 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2693 let loss_delay = path
2694 .rtt
2695 .conservative()
2696 .mul_f32(self.config.time_threshold)
2697 .max(TIMER_GRANULARITY);
2698 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2699
2700 let largest_acked_packet = self.spaces[pn_space]
2701 .for_path(path_id)
2702 .largest_acked_packet
2703 .expect("detect_lost_packets only to be called if path received at least one ACK");
2704 let packet_threshold = self.config.packet_threshold as u64;
2705
2706 let congestion_period = self
2710 .pto(SpaceId::Data, path_id)
2711 .saturating_mul(self.config.persistent_congestion_threshold);
2712 let mut persistent_congestion_start: Option<Instant> = None;
2713 let mut prev_packet = None;
2714 let space = self.spaces[pn_space].for_path(path_id);
2715
2716 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2717 if prev_packet != Some(packet.wrapping_sub(1)) {
2718 persistent_congestion_start = None;
2720 }
2721
2722 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2726 if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2727 if Some(packet) == in_flight_mtu_probe {
2729 lost_mtu_probe = in_flight_mtu_probe;
2732 } else {
2733 lost_packets.push(packet);
2734 size_of_lost_packets += info.size as u64;
2735 if info.ack_eliciting && due_to_ack {
2736 match persistent_congestion_start {
2737 Some(start) if info.time_sent - start > congestion_period => {
2740 in_persistent_congestion = true;
2741 }
2742 None if first_packet_after_rtt_sample
2744 .is_some_and(|x| x < (pn_space, packet)) =>
2745 {
2746 persistent_congestion_start = Some(info.time_sent);
2747 }
2748 _ => {}
2749 }
2750 }
2751 }
2752 } else {
2753 if space.loss_time.is_none() {
2755 space.loss_time = Some(info.time_sent + loss_delay);
2758 }
2759 persistent_congestion_start = None;
2760 }
2761
2762 prev_packet = Some(packet);
2763 }
2764
2765 self.handle_lost_packets(
2766 pn_space,
2767 path_id,
2768 now,
2769 lost_packets,
2770 lost_mtu_probe,
2771 loss_delay,
2772 in_persistent_congestion,
2773 size_of_lost_packets,
2774 );
2775 }
2776
2777 fn discard_path(&mut self, path_id: PathId, now: Instant) {
2779 trace!(%path_id, "dropping path state");
2780 let path = self.path_data(path_id);
2781 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2782
2783 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2785 .for_path(path_id)
2786 .sent_packets
2787 .iter()
2788 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2789 .map(|(pn, info)| {
2790 size_of_lost_packets += info.size as u64;
2791 pn
2792 })
2793 .collect();
2794
2795 if !lost_pns.is_empty() {
2796 trace!(
2797 %path_id,
2798 count = lost_pns.len(),
2799 lost_bytes = size_of_lost_packets,
2800 "packets lost on path abandon"
2801 );
2802 self.handle_lost_packets(
2803 SpaceId::Data,
2804 path_id,
2805 now,
2806 lost_pns,
2807 in_flight_mtu_probe,
2808 Duration::ZERO,
2809 false,
2810 size_of_lost_packets,
2811 );
2812 }
2813 self.paths.remove(&path_id);
2814 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2815
2816 let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2817 self.events.push_back(
2818 PathEvent::Abandoned {
2819 id: path_id,
2820 path_stats,
2821 }
2822 .into(),
2823 );
2824 }
2825
2826 fn handle_lost_packets(
2827 &mut self,
2828 pn_space: SpaceId,
2829 path_id: PathId,
2830 now: Instant,
2831 lost_packets: Vec<u64>,
2832 lost_mtu_probe: Option<u64>,
2833 loss_delay: Duration,
2834 in_persistent_congestion: bool,
2835 size_of_lost_packets: u64,
2836 ) {
2837 debug_assert!(
2838 {
2839 let mut sorted = lost_packets.clone();
2840 sorted.sort();
2841 sorted == lost_packets
2842 },
2843 "lost_packets must be sorted"
2844 );
2845
2846 self.drain_lost_packets(now, pn_space, path_id);
2847
2848 if let Some(largest_lost) = lost_packets.last().cloned() {
2850 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2851 let largest_lost_sent = self.spaces[pn_space]
2852 .for_path(path_id)
2853 .sent_packets
2854 .get(largest_lost)
2855 .unwrap()
2856 .time_sent;
2857 let path_stats = self.path_stats.entry(path_id).or_default();
2858 path_stats.lost_packets += lost_packets.len() as u64;
2859 path_stats.lost_bytes += size_of_lost_packets;
2860 trace!(
2861 %path_id,
2862 count = lost_packets.len(),
2863 lost_bytes = size_of_lost_packets,
2864 "packets lost",
2865 );
2866
2867 for &packet in &lost_packets {
2868 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2869 continue;
2870 };
2871 self.qlog
2872 .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2873 self.paths
2874 .get_mut(&path_id)
2875 .unwrap()
2876 .remove_in_flight(&info);
2877
2878 for frame in info.stream_frames {
2879 self.streams.retransmit(frame);
2880 }
2881 self.spaces[pn_space].pending |= info.retransmits;
2882 self.path_data_mut(path_id)
2883 .mtud
2884 .on_non_probe_lost(packet, info.size);
2885
2886 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2887 packet,
2888 LostPacket {
2889 time_sent: info.time_sent,
2890 },
2891 );
2892 }
2893
2894 let path = self.path_data_mut(path_id);
2895 if path.mtud.black_hole_detected(now) {
2896 path.congestion.on_mtu_update(path.mtud.current_mtu());
2897 if let Some(max_datagram_size) = self.datagrams().max_size() {
2898 self.datagrams.drop_oversized(max_datagram_size);
2899 }
2900 self.path_stats
2901 .entry(path_id)
2902 .or_default()
2903 .black_holes_detected += 1;
2904 }
2905
2906 let lost_ack_eliciting =
2908 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2909
2910 if lost_ack_eliciting {
2911 self.path_stats
2912 .entry(path_id)
2913 .or_default()
2914 .congestion_events += 1;
2915 self.path_data_mut(path_id).congestion.on_congestion_event(
2916 now,
2917 largest_lost_sent,
2918 in_persistent_congestion,
2919 false,
2920 size_of_lost_packets,
2921 );
2922 }
2923 }
2924
2925 if let Some(packet) = lost_mtu_probe {
2927 let info = self.spaces[SpaceId::Data]
2928 .for_path(path_id)
2929 .take(packet)
2930 .unwrap(); self.paths
2933 .get_mut(&path_id)
2934 .unwrap()
2935 .remove_in_flight(&info);
2936 self.path_data_mut(path_id).mtud.on_probe_lost();
2937 self.path_stats
2938 .entry(path_id)
2939 .or_default()
2940 .lost_plpmtud_probes += 1;
2941 }
2942 }
2943
2944 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2950 SpaceId::iter()
2951 .filter_map(|id| {
2952 self.spaces[id]
2953 .number_spaces
2954 .get(&path_id)
2955 .and_then(|pns| pns.loss_time)
2956 .map(|time| (time, id))
2957 })
2958 .min_by_key(|&(time, _)| time)
2959 }
2960
2961 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2963 let path = self.path(path_id)?;
2964 let pto_count = path.pto_count;
2965 let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2966 let mut duration = path.rtt.pto_base() * backoff;
2967
2968 if path_id == PathId::ZERO
2969 && path.in_flight.ack_eliciting == 0
2970 && !self.peer_completed_address_validation(PathId::ZERO)
2971 {
2972 let space = match self.highest_space {
2978 SpaceId::Handshake => SpaceId::Handshake,
2979 _ => SpaceId::Initial,
2980 };
2981
2982 return Some((now + duration, space));
2983 }
2984
2985 let mut result = None;
2986 for space in SpaceId::iter() {
2987 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2988 continue;
2989 };
2990
2991 if !pns.has_in_flight() {
2992 continue;
2993 }
2994 if space == SpaceId::Data {
2995 if self.is_handshaking() {
2997 return result;
2998 }
2999 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
3001 }
3002 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3003 continue;
3004 };
3005 let pto = last_ack_eliciting + duration;
3006 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3007 if path.anti_amplification_blocked(1) {
3008 continue;
3010 }
3011 if path.in_flight.ack_eliciting == 0 {
3012 continue;
3014 }
3015 result = Some((pto, space));
3016 }
3017 }
3018 result
3019 }
3020
3021 fn peer_completed_address_validation(&self, path: PathId) -> bool {
3022 if self.side.is_server() || self.state.is_closed() {
3024 return true;
3025 }
3026 self.spaces[SpaceId::Handshake]
3029 .path_space(PathId::ZERO)
3030 .and_then(|pns| pns.largest_acked_packet)
3031 .is_some()
3032 || self.spaces[SpaceId::Data]
3033 .path_space(path)
3034 .and_then(|pns| pns.largest_acked_packet)
3035 .is_some()
3036 || (self.spaces[SpaceId::Data].crypto.is_some()
3037 && self.spaces[SpaceId::Handshake].crypto.is_none())
3038 }
3039
3040 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3048 if self.state.is_closed() {
3049 return;
3053 }
3054
3055 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3056 self.timers.set(
3058 Timer::PerPath(path_id, PathTimer::LossDetection),
3059 loss_time,
3060 self.qlog.with_time(now),
3061 );
3062 return;
3063 }
3064
3065 if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3068 self.timers.set(
3069 Timer::PerPath(path_id, PathTimer::LossDetection),
3070 timeout,
3071 self.qlog.with_time(now),
3072 );
3073 } else {
3074 self.timers.stop(
3075 Timer::PerPath(path_id, PathTimer::LossDetection),
3076 self.qlog.with_time(now),
3077 );
3078 }
3079 }
3080
3081 fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3086 let max_ack_delay = match space {
3087 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3088 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3089 };
3090 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3091 }
3092
3093 fn on_packet_authenticated(
3094 &mut self,
3095 now: Instant,
3096 space_id: SpaceId,
3097 path_id: PathId,
3098 ecn: Option<EcnCodepoint>,
3099 packet: Option<u64>,
3100 spin: bool,
3101 is_1rtt: bool,
3102 ) {
3103 self.total_authed_packets += 1;
3104 if let Some(last_allowed_receive) = self
3105 .paths
3106 .get(&path_id)
3107 .and_then(|path| path.data.last_allowed_receive)
3108 {
3109 if now > last_allowed_receive {
3110 warn!("received data on path which we abandoned more than 3 * PTO ago");
3111 if !self.state.is_closed() {
3113 self.state.move_to_closed(TransportError::NO_ERROR(
3115 "peer failed to respond with PATH_ABANDON in time",
3116 ));
3117 self.close_common();
3118 self.set_close_timer(now);
3119 self.close = true;
3120 }
3121 return;
3122 }
3123 }
3124
3125 self.reset_keep_alive(path_id, now);
3126 self.reset_idle_timeout(now, space_id, path_id);
3127 self.permit_idle_reset = true;
3128 self.receiving_ecn |= ecn.is_some();
3129 if let Some(x) = ecn {
3130 let space = &mut self.spaces[space_id];
3131 space.for_path(path_id).ecn_counters += x;
3132
3133 if x.is_ce() {
3134 space
3135 .for_path(path_id)
3136 .pending_acks
3137 .set_immediate_ack_required();
3138 }
3139 }
3140
3141 let packet = match packet {
3142 Some(x) => x,
3143 None => return,
3144 };
3145 match &self.side {
3146 ConnectionSide::Client { .. } => {
3147 if space_id == SpaceId::Handshake {
3151 if let Some(hs) = self.state.as_handshake_mut() {
3152 hs.allow_server_migration = false;
3153 }
3154 }
3155 }
3156 ConnectionSide::Server { .. } => {
3157 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3158 {
3159 self.discard_space(now, SpaceId::Initial);
3161 }
3162 if self.zero_rtt_crypto.is_some() && is_1rtt {
3163 self.set_key_discard_timer(now, space_id)
3165 }
3166 }
3167 }
3168 let space = self.spaces[space_id].for_path(path_id);
3169 space.pending_acks.insert_one(packet, now);
3170 if packet >= space.rx_packet.unwrap_or_default() {
3171 space.rx_packet = Some(packet);
3172 self.spin = self.side.is_client() ^ spin;
3174 }
3175 }
3176
3177 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3182 if let Some(timeout) = self.idle_timeout {
3184 if self.state.is_closed() {
3185 self.timers
3186 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3187 } else {
3188 let end = self.calculate_end_timer(now, 3, space);
3190 self.timers
3191 .set(Timer::Conn(ConnTimer::Idle), end, self.qlog.with_time(now));
3192 }
3193 }
3194
3195 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3197 if self.state.is_closed() {
3198 self.timers.stop(
3199 Timer::PerPath(path_id, PathTimer::PathIdle),
3200 self.qlog.with_time(now),
3201 );
3202 } else {
3203 let pto = self.pto(space, path_id);
3204 let end = self.path_data(path_id).timer_offset(now, 3 * pto);
3205 let dt = cmp::max(now + timeout, end);
3206 self.timers.set(
3207 Timer::PerPath(path_id, PathTimer::PathIdle),
3208 dt,
3209 self.qlog.with_time(now),
3210 );
3211 }
3212 }
3213 }
3214
3215 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3217 if !self.state.is_established() {
3218 return;
3219 }
3220
3221 if let Some(interval) = self.config.keep_alive_interval {
3222 self.timers.set(
3223 Timer::Conn(ConnTimer::KeepAlive),
3224 now + interval,
3225 self.qlog.with_time(now),
3226 );
3227 }
3228
3229 if let Some(interval) = self.path_data(path_id).keep_alive {
3230 self.timers.set(
3231 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3232 now + interval,
3233 self.qlog.with_time(now),
3234 );
3235 }
3236 }
3237
3238 fn reset_cid_retirement(&mut self, now: Instant) {
3240 if let Some((_path, t)) = self.next_cid_retirement() {
3241 self.timers.set(
3242 Timer::Conn(ConnTimer::PushNewCid),
3243 t,
3244 self.qlog.with_time(now),
3245 );
3246 }
3247 }
3248
3249 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3251 self.local_cid_state
3252 .iter()
3253 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3254 .min_by_key(|(_path_id, timeout)| *timeout)
3255 }
3256
3257 pub(crate) fn handle_first_packet(
3262 &mut self,
3263 now: Instant,
3264 remote: SocketAddr,
3265 ecn: Option<EcnCodepoint>,
3266 packet_number: u64,
3267 packet: InitialPacket,
3268 remaining: Option<BytesMut>,
3269 ) -> Result<(), ConnectionError> {
3270 let span = trace_span!("first recv");
3271 let _guard = span.enter();
3272 debug_assert!(self.side.is_server());
3273 let len = packet.header_data.len() + packet.payload.len();
3274 let path_id = PathId::ZERO;
3275 self.path_data_mut(path_id).total_recvd = len as u64;
3276
3277 if let Some(hs) = self.state.as_handshake_mut() {
3278 hs.expected_token = packet.header.token.clone();
3279 } else {
3280 unreachable!("first packet must be delivered in Handshake state");
3281 }
3282
3283 self.on_packet_authenticated(
3285 now,
3286 SpaceId::Initial,
3287 path_id,
3288 ecn,
3289 Some(packet_number),
3290 false,
3291 false,
3292 );
3293
3294 let packet: Packet = packet.into();
3295
3296 let mut qlog = QlogRecvPacket::new(len);
3297 qlog.header(&packet.header, Some(packet_number), path_id);
3298
3299 self.process_decrypted_packet(
3300 now,
3301 remote,
3302 path_id,
3303 Some(packet_number),
3304 packet,
3305 &mut qlog,
3306 )?;
3307 self.qlog.emit_packet_received(qlog, now);
3308 if let Some(data) = remaining {
3309 self.handle_coalesced(now, remote, path_id, ecn, data);
3310 }
3311
3312 self.qlog.emit_recovery_metrics(
3313 path_id,
3314 &mut self.paths.get_mut(&path_id).unwrap().data,
3315 now,
3316 );
3317
3318 Ok(())
3319 }
3320
3321 fn init_0rtt(&mut self, now: Instant) {
3322 let (header, packet) = match self.crypto.early_crypto() {
3323 Some(x) => x,
3324 None => return,
3325 };
3326 if self.side.is_client() {
3327 match self.crypto.transport_parameters() {
3328 Ok(params) => {
3329 let params = params
3330 .expect("crypto layer didn't supply transport parameters with ticket");
3331 let params = TransportParameters {
3333 initial_src_cid: None,
3334 original_dst_cid: None,
3335 preferred_address: None,
3336 retry_src_cid: None,
3337 stateless_reset_token: None,
3338 min_ack_delay: None,
3339 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3340 max_ack_delay: TransportParameters::default().max_ack_delay,
3341 initial_max_path_id: None,
3342 ..params
3343 };
3344 self.set_peer_params(params);
3345 self.qlog.emit_peer_transport_params_restored(self, now);
3346 }
3347 Err(e) => {
3348 error!("session ticket has malformed transport parameters: {}", e);
3349 return;
3350 }
3351 }
3352 }
3353 trace!("0-RTT enabled");
3354 self.zero_rtt_enabled = true;
3355 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3356 }
3357
3358 fn read_crypto(
3359 &mut self,
3360 space: SpaceId,
3361 crypto: &frame::Crypto,
3362 payload_len: usize,
3363 ) -> Result<(), TransportError> {
3364 let expected = if !self.state.is_handshake() {
3365 SpaceId::Data
3366 } else if self.highest_space == SpaceId::Initial {
3367 SpaceId::Initial
3368 } else {
3369 SpaceId::Handshake
3372 };
3373 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3377
3378 let end = crypto.offset + crypto.data.len() as u64;
3379 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3380 warn!(
3381 "received new {:?} CRYPTO data when expecting {:?}",
3382 space, expected
3383 );
3384 return Err(TransportError::PROTOCOL_VIOLATION(
3385 "new data at unexpected encryption level",
3386 ));
3387 }
3388
3389 let space = &mut self.spaces[space];
3390 let max = end.saturating_sub(space.crypto_stream.bytes_read());
3391 if max > self.config.crypto_buffer_size as u64 {
3392 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3393 }
3394
3395 space
3396 .crypto_stream
3397 .insert(crypto.offset, crypto.data.clone(), payload_len);
3398 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3399 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3400 if self.crypto.read_handshake(&chunk.bytes)? {
3401 self.events.push_back(Event::HandshakeDataReady);
3402 }
3403 }
3404
3405 Ok(())
3406 }
3407
3408 fn write_crypto(&mut self) {
3409 loop {
3410 let space = self.highest_space;
3411 let mut outgoing = Vec::new();
3412 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3413 match space {
3414 SpaceId::Initial => {
3415 self.upgrade_crypto(SpaceId::Handshake, crypto);
3416 }
3417 SpaceId::Handshake => {
3418 self.upgrade_crypto(SpaceId::Data, crypto);
3419 }
3420 _ => unreachable!("got updated secrets during 1-RTT"),
3421 }
3422 }
3423 if outgoing.is_empty() {
3424 if space == self.highest_space {
3425 break;
3426 } else {
3427 continue;
3429 }
3430 }
3431 let offset = self.spaces[space].crypto_offset;
3432 let outgoing = Bytes::from(outgoing);
3433 if let Some(hs) = self.state.as_handshake_mut() {
3434 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3435 hs.client_hello = Some(outgoing.clone());
3436 }
3437 }
3438 self.spaces[space].crypto_offset += outgoing.len() as u64;
3439 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3440 self.spaces[space].pending.crypto.push_back(frame::Crypto {
3441 offset,
3442 data: outgoing,
3443 });
3444 }
3445 }
3446
3447 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3449 debug_assert!(
3450 self.spaces[space].crypto.is_none(),
3451 "already reached packet space {space:?}"
3452 );
3453 trace!("{:?} keys ready", space);
3454 if space == SpaceId::Data {
3455 self.next_crypto = Some(
3457 self.crypto
3458 .next_1rtt_keys()
3459 .expect("handshake should be complete"),
3460 );
3461 }
3462
3463 self.spaces[space].crypto = Some(crypto);
3464 debug_assert!(space as usize > self.highest_space as usize);
3465 self.highest_space = space;
3466 if space == SpaceId::Data && self.side.is_client() {
3467 self.zero_rtt_crypto = None;
3469 }
3470 }
3471
3472 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3473 debug_assert!(space_id != SpaceId::Data);
3474 trace!("discarding {:?} keys", space_id);
3475 if space_id == SpaceId::Initial {
3476 if let ConnectionSide::Client { token, .. } = &mut self.side {
3478 *token = Bytes::new();
3479 }
3480 }
3481 let space = &mut self.spaces[space_id];
3482 space.crypto = None;
3483 let pns = space.for_path(PathId::ZERO);
3484 pns.time_of_last_ack_eliciting_packet = None;
3485 pns.loss_time = None;
3486 pns.loss_probes = 0;
3487 let sent_packets = mem::take(&mut pns.sent_packets);
3488 let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3489 for (_, packet) in sent_packets.into_iter() {
3490 path.data.remove_in_flight(&packet);
3491 }
3492
3493 self.set_loss_detection_timer(now, PathId::ZERO)
3494 }
3495
3496 fn handle_coalesced(
3497 &mut self,
3498 now: Instant,
3499 remote: SocketAddr,
3500 path_id: PathId,
3501 ecn: Option<EcnCodepoint>,
3502 data: BytesMut,
3503 ) {
3504 self.path_data_mut(path_id)
3505 .inc_total_recvd(data.len() as u64);
3506 let mut remaining = Some(data);
3507 let cid_len = self
3508 .local_cid_state
3509 .values()
3510 .map(|cid_state| cid_state.cid_len())
3511 .next()
3512 .expect("one cid_state must exist");
3513 while let Some(data) = remaining {
3514 match PartialDecode::new(
3515 data,
3516 &FixedLengthConnectionIdParser::new(cid_len),
3517 &[self.version],
3518 self.endpoint_config.grease_quic_bit,
3519 ) {
3520 Ok((partial_decode, rest)) => {
3521 remaining = rest;
3522 self.handle_decode(now, remote, path_id, ecn, partial_decode);
3523 }
3524 Err(e) => {
3525 trace!("malformed header: {}", e);
3526 return;
3527 }
3528 }
3529 }
3530 }
3531
3532 fn handle_decode(
3533 &mut self,
3534 now: Instant,
3535 remote: SocketAddr,
3536 path_id: PathId,
3537 ecn: Option<EcnCodepoint>,
3538 partial_decode: PartialDecode,
3539 ) {
3540 let qlog = QlogRecvPacket::new(partial_decode.len());
3541 if let Some(decoded) = packet_crypto::unprotect_header(
3542 partial_decode,
3543 &self.spaces,
3544 self.zero_rtt_crypto.as_ref(),
3545 self.peer_params.stateless_reset_token,
3546 ) {
3547 self.handle_packet(
3548 now,
3549 remote,
3550 path_id,
3551 ecn,
3552 decoded.packet,
3553 decoded.stateless_reset,
3554 qlog,
3555 );
3556 }
3557 }
3558
3559 fn handle_packet(
3560 &mut self,
3561 now: Instant,
3562 remote: SocketAddr,
3563 path_id: PathId,
3564 ecn: Option<EcnCodepoint>,
3565 packet: Option<Packet>,
3566 stateless_reset: bool,
3567 mut qlog: QlogRecvPacket,
3568 ) {
3569 self.stats.udp_rx.ios += 1;
3570 if let Some(ref packet) = packet {
3571 trace!(
3572 "got {:?} packet ({} bytes) from {} using id {}",
3573 packet.header.space(),
3574 packet.payload.len() + packet.header_data.len(),
3575 remote,
3576 packet.header.dst_cid(),
3577 );
3578 }
3579
3580 if self.is_handshaking() {
3581 if path_id != PathId::ZERO {
3582 debug!(%remote, %path_id, "discarding multipath packet during handshake");
3583 return;
3584 }
3585 if remote != self.path_data_mut(path_id).remote {
3586 if let Some(hs) = self.state.as_handshake() {
3587 if hs.allow_server_migration {
3588 trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote");
3589 self.path_data_mut(path_id).remote = remote;
3590 self.qlog.emit_tuple_assigned(path_id, remote, now);
3591 } else {
3592 debug!("discarding packet with unexpected remote during handshake");
3593 return;
3594 }
3595 } else {
3596 debug!("discarding packet with unexpected remote during handshake");
3597 return;
3598 }
3599 }
3600 }
3601
3602 let was_closed = self.state.is_closed();
3603 let was_drained = self.state.is_drained();
3604
3605 let decrypted = match packet {
3606 None => Err(None),
3607 Some(mut packet) => self
3608 .decrypt_packet(now, path_id, &mut packet)
3609 .map(move |number| (packet, number)),
3610 };
3611 let result = match decrypted {
3612 _ if stateless_reset => {
3613 debug!("got stateless reset");
3614 Err(ConnectionError::Reset)
3615 }
3616 Err(Some(e)) => {
3617 warn!("illegal packet: {}", e);
3618 Err(e.into())
3619 }
3620 Err(None) => {
3621 debug!("failed to authenticate packet");
3622 self.authentication_failures += 1;
3623 let integrity_limit = self.spaces[self.highest_space]
3624 .crypto
3625 .as_ref()
3626 .unwrap()
3627 .packet
3628 .local
3629 .integrity_limit();
3630 if self.authentication_failures > integrity_limit {
3631 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3632 } else {
3633 return;
3634 }
3635 }
3636 Ok((packet, number)) => {
3637 qlog.header(&packet.header, number, path_id);
3638 let span = match number {
3639 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3640 None => trace_span!("recv", space = ?packet.header.space()),
3641 };
3642 let _guard = span.enter();
3643
3644 let dedup = self.spaces[packet.header.space()]
3645 .path_space_mut(path_id)
3646 .map(|pns| &mut pns.dedup);
3647 if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3648 debug!("discarding possible duplicate packet");
3649 self.qlog.emit_packet_received(qlog, now);
3650 return;
3651 } else if self.state.is_handshake() && packet.header.is_short() {
3652 trace!("dropping short packet during handshake");
3654 self.qlog.emit_packet_received(qlog, now);
3655 return;
3656 } else {
3657 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3658 if let Some(hs) = self.state.as_handshake() {
3659 if self.side.is_server() && token != &hs.expected_token {
3660 warn!("discarding Initial with invalid retry token");
3664 self.qlog.emit_packet_received(qlog, now);
3665 return;
3666 }
3667 }
3668 }
3669
3670 if !self.state.is_closed() {
3671 let spin = match packet.header {
3672 Header::Short { spin, .. } => spin,
3673 _ => false,
3674 };
3675
3676 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3677 self.ensure_path(path_id, remote, now, number);
3679 }
3680 if self.paths.contains_key(&path_id) {
3681 self.on_packet_authenticated(
3682 now,
3683 packet.header.space(),
3684 path_id,
3685 ecn,
3686 number,
3687 spin,
3688 packet.header.is_1rtt(),
3689 );
3690 }
3691 }
3692
3693 let res = self
3694 .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog);
3695
3696 self.qlog.emit_packet_received(qlog, now);
3697 res
3698 }
3699 }
3700 };
3701
3702 if let Err(conn_err) = result {
3704 match conn_err {
3705 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3706 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3707 ConnectionError::Reset
3708 | ConnectionError::TransportError(TransportError {
3709 code: TransportErrorCode::AEAD_LIMIT_REACHED,
3710 ..
3711 }) => {
3712 self.state.move_to_drained(Some(conn_err));
3713 }
3714 ConnectionError::TimedOut => {
3715 unreachable!("timeouts aren't generated by packet processing");
3716 }
3717 ConnectionError::TransportError(err) => {
3718 debug!("closing connection due to transport error: {}", err);
3719 self.state.move_to_closed(err);
3720 }
3721 ConnectionError::VersionMismatch => {
3722 self.state.move_to_draining(Some(conn_err));
3723 }
3724 ConnectionError::LocallyClosed => {
3725 unreachable!("LocallyClosed isn't generated by packet processing");
3726 }
3727 ConnectionError::CidsExhausted => {
3728 unreachable!("CidsExhausted isn't generated by packet processing");
3729 }
3730 };
3731 }
3732
3733 if !was_closed && self.state.is_closed() {
3734 self.close_common();
3735 if !self.state.is_drained() {
3736 self.set_close_timer(now);
3737 }
3738 }
3739 if !was_drained && self.state.is_drained() {
3740 self.endpoint_events.push_back(EndpointEventInner::Drained);
3741 self.timers
3744 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3745 }
3746
3747 if matches!(self.state.as_type(), StateType::Closed) {
3749 let path_remote = self
3753 .paths
3754 .get(&path_id)
3755 .map(|p| p.data.remote)
3756 .unwrap_or(remote);
3757 self.close = remote == path_remote;
3758 }
3759 }
3760
3761 fn process_decrypted_packet(
3762 &mut self,
3763 now: Instant,
3764 remote: SocketAddr,
3765 path_id: PathId,
3766 number: Option<u64>,
3767 packet: Packet,
3768 qlog: &mut QlogRecvPacket,
3769 ) -> Result<(), ConnectionError> {
3770 if !self.paths.contains_key(&path_id) {
3771 trace!(%path_id, ?number, "discarding packet for unknown path");
3775 return Ok(());
3776 }
3777 let state = match self.state.as_type() {
3778 StateType::Established => {
3779 match packet.header.space() {
3780 SpaceId::Data => {
3781 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?
3782 }
3783 _ if packet.header.has_frames() => {
3784 self.process_early_payload(now, path_id, packet, qlog)?
3785 }
3786 _ => {
3787 trace!("discarding unexpected pre-handshake packet");
3788 }
3789 }
3790 return Ok(());
3791 }
3792 StateType::Closed => {
3793 for result in frame::Iter::new(packet.payload.freeze())? {
3794 let frame = match result {
3795 Ok(frame) => frame,
3796 Err(err) => {
3797 debug!("frame decoding error: {err:?}");
3798 continue;
3799 }
3800 };
3801 qlog.frame(&frame);
3802
3803 if let Frame::Padding = frame {
3804 continue;
3805 };
3806
3807 self.stats.frame_rx.record(&frame);
3808
3809 if let Frame::Close(_error) = frame {
3810 trace!("draining");
3811 self.state.move_to_draining(None);
3812 break;
3813 }
3814 }
3815 return Ok(());
3816 }
3817 StateType::Draining | StateType::Drained => return Ok(()),
3818 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3819 };
3820
3821 match packet.header {
3822 Header::Retry {
3823 src_cid: rem_cid, ..
3824 } => {
3825 debug_assert_eq!(path_id, PathId::ZERO);
3826 if self.side.is_server() {
3827 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3828 }
3829
3830 let is_valid_retry = self
3831 .rem_cids
3832 .get(&path_id)
3833 .map(|cids| cids.active())
3834 .map(|orig_dst_cid| {
3835 self.crypto.is_valid_retry(
3836 orig_dst_cid,
3837 &packet.header_data,
3838 &packet.payload,
3839 )
3840 })
3841 .unwrap_or_default();
3842 if self.total_authed_packets > 1
3843 || packet.payload.len() <= 16 || !is_valid_retry
3845 {
3846 trace!("discarding invalid Retry");
3847 return Ok(());
3855 }
3856
3857 trace!("retrying with CID {}", rem_cid);
3858 let client_hello = state.client_hello.take().unwrap();
3859 self.retry_src_cid = Some(rem_cid);
3860 self.rem_cids
3861 .get_mut(&path_id)
3862 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3863 .update_initial_cid(rem_cid);
3864 self.rem_handshake_cid = rem_cid;
3865
3866 let space = &mut self.spaces[SpaceId::Initial];
3867 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3868 self.on_packet_acked(now, PathId::ZERO, info);
3869 };
3870
3871 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = {
3874 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3875 space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3876 space.crypto_offset = client_hello.len() as u64;
3877 space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3878 .for_path(path_id)
3879 .next_packet_number;
3880 space.pending.crypto.push_back(frame::Crypto {
3881 offset: 0,
3882 data: client_hello,
3883 });
3884 space
3885 };
3886
3887 let zero_rtt = mem::take(
3889 &mut self.spaces[SpaceId::Data]
3890 .for_path(PathId::ZERO)
3891 .sent_packets,
3892 );
3893 for (_, info) in zero_rtt.into_iter() {
3894 self.paths
3895 .get_mut(&PathId::ZERO)
3896 .unwrap()
3897 .remove_in_flight(&info);
3898 self.spaces[SpaceId::Data].pending |= info.retransmits;
3899 }
3900 self.streams.retransmit_all_for_0rtt();
3901
3902 let token_len = packet.payload.len() - 16;
3903 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3904 unreachable!("we already short-circuited if we're server");
3905 };
3906 *token = packet.payload.freeze().split_to(token_len);
3907
3908 self.state = State::handshake(state::Handshake {
3909 expected_token: Bytes::new(),
3910 rem_cid_set: false,
3911 client_hello: None,
3912 allow_server_migration: true,
3913 });
3914 Ok(())
3915 }
3916 Header::Long {
3917 ty: LongType::Handshake,
3918 src_cid: rem_cid,
3919 dst_cid: loc_cid,
3920 ..
3921 } => {
3922 debug_assert_eq!(path_id, PathId::ZERO);
3923 if rem_cid != self.rem_handshake_cid {
3924 debug!(
3925 "discarding packet with mismatched remote CID: {} != {}",
3926 self.rem_handshake_cid, rem_cid
3927 );
3928 return Ok(());
3929 }
3930 self.on_path_validated(path_id);
3931
3932 self.process_early_payload(now, path_id, packet, qlog)?;
3933 if self.state.is_closed() {
3934 return Ok(());
3935 }
3936
3937 if self.crypto.is_handshaking() {
3938 trace!("handshake ongoing");
3939 return Ok(());
3940 }
3941
3942 if self.side.is_client() {
3943 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3945 TransportError::new(
3946 TransportErrorCode::crypto(0x6d),
3947 "transport parameters missing".to_owned(),
3948 )
3949 })?;
3950
3951 if self.has_0rtt() {
3952 if !self.crypto.early_data_accepted().unwrap() {
3953 debug_assert!(self.side.is_client());
3954 debug!("0-RTT rejected");
3955 self.accepted_0rtt = false;
3956 self.streams.zero_rtt_rejected();
3957
3958 self.spaces[SpaceId::Data].pending = Retransmits::default();
3960
3961 let sent_packets = mem::take(
3963 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3964 );
3965 for (_, packet) in sent_packets.into_iter() {
3966 self.paths
3967 .get_mut(&path_id)
3968 .unwrap()
3969 .remove_in_flight(&packet);
3970 }
3971 } else {
3972 self.accepted_0rtt = true;
3973 params.validate_resumption_from(&self.peer_params)?;
3974 }
3975 }
3976 if let Some(token) = params.stateless_reset_token {
3977 let remote = self.path_data(path_id).remote;
3978 self.endpoint_events
3979 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
3980 }
3981 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
3982 self.issue_first_cids(now);
3983 } else {
3984 self.spaces[SpaceId::Data].pending.handshake_done = true;
3986 self.discard_space(now, SpaceId::Handshake);
3987 self.events.push_back(Event::HandshakeConfirmed);
3988 trace!("handshake confirmed");
3989 }
3990
3991 self.events.push_back(Event::Connected);
3992 self.state.move_to_established();
3993 trace!("established");
3994
3995 self.issue_first_path_cids(now);
3998 Ok(())
3999 }
4000 Header::Initial(InitialHeader {
4001 src_cid: rem_cid,
4002 dst_cid: loc_cid,
4003 ..
4004 }) => {
4005 debug_assert_eq!(path_id, PathId::ZERO);
4006 if !state.rem_cid_set {
4007 trace!("switching remote CID to {}", rem_cid);
4008 let mut state = state.clone();
4009 self.rem_cids
4010 .get_mut(&path_id)
4011 .expect("PathId::ZERO not yet abandoned")
4012 .update_initial_cid(rem_cid);
4013 self.rem_handshake_cid = rem_cid;
4014 self.orig_rem_cid = rem_cid;
4015 state.rem_cid_set = true;
4016 self.state.move_to_handshake(state);
4017 } else if rem_cid != self.rem_handshake_cid {
4018 debug!(
4019 "discarding packet with mismatched remote CID: {} != {}",
4020 self.rem_handshake_cid, rem_cid
4021 );
4022 return Ok(());
4023 }
4024
4025 let starting_space = self.highest_space;
4026 self.process_early_payload(now, path_id, packet, qlog)?;
4027
4028 if self.side.is_server()
4029 && starting_space == SpaceId::Initial
4030 && self.highest_space != SpaceId::Initial
4031 {
4032 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4033 TransportError::new(
4034 TransportErrorCode::crypto(0x6d),
4035 "transport parameters missing".to_owned(),
4036 )
4037 })?;
4038 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4039 self.issue_first_cids(now);
4040 self.init_0rtt(now);
4041 }
4042 Ok(())
4043 }
4044 Header::Long {
4045 ty: LongType::ZeroRtt,
4046 ..
4047 } => {
4048 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?;
4049 Ok(())
4050 }
4051 Header::VersionNegotiate { .. } => {
4052 if self.total_authed_packets > 1 {
4053 return Ok(());
4054 }
4055 let supported = packet
4056 .payload
4057 .chunks(4)
4058 .any(|x| match <[u8; 4]>::try_from(x) {
4059 Ok(version) => self.version == u32::from_be_bytes(version),
4060 Err(_) => false,
4061 });
4062 if supported {
4063 return Ok(());
4064 }
4065 debug!("remote doesn't support our version");
4066 Err(ConnectionError::VersionMismatch)
4067 }
4068 Header::Short { .. } => unreachable!(
4069 "short packets received during handshake are discarded in handle_packet"
4070 ),
4071 }
4072 }
4073
4074 fn process_early_payload(
4076 &mut self,
4077 now: Instant,
4078 path_id: PathId,
4079 packet: Packet,
4080 #[allow(unused)] qlog: &mut QlogRecvPacket,
4081 ) -> Result<(), TransportError> {
4082 debug_assert_ne!(packet.header.space(), SpaceId::Data);
4083 debug_assert_eq!(path_id, PathId::ZERO);
4084 let payload_len = packet.payload.len();
4085 let mut ack_eliciting = false;
4086 for result in frame::Iter::new(packet.payload.freeze())? {
4087 let frame = result?;
4088 qlog.frame(&frame);
4089 let span = match frame {
4090 Frame::Padding => continue,
4091 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4092 };
4093
4094 self.stats.frame_rx.record(&frame);
4095
4096 let _guard = span.as_ref().map(|x| x.enter());
4097 ack_eliciting |= frame.is_ack_eliciting();
4098
4099 if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4101 return Err(TransportError::PROTOCOL_VIOLATION(
4102 "illegal frame type in handshake",
4103 ));
4104 }
4105
4106 match frame {
4107 Frame::Padding | Frame::Ping => {}
4108 Frame::Crypto(frame) => {
4109 self.read_crypto(packet.header.space(), &frame, payload_len)?;
4110 }
4111 Frame::Ack(ack) => {
4112 self.on_ack_received(now, packet.header.space(), ack)?;
4113 }
4114 Frame::PathAck(ack) => {
4115 span.as_ref()
4116 .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4117 self.on_path_ack_received(now, packet.header.space(), ack)?;
4118 }
4119 Frame::Close(reason) => {
4120 self.state.move_to_draining(Some(reason.into()));
4121 return Ok(());
4122 }
4123 _ => {
4124 let mut err =
4125 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4126 err.frame = Some(frame.ty());
4127 return Err(err);
4128 }
4129 }
4130 }
4131
4132 if ack_eliciting {
4133 self.spaces[packet.header.space()]
4135 .for_path(path_id)
4136 .pending_acks
4137 .set_immediate_ack_required();
4138 }
4139
4140 self.write_crypto();
4141 Ok(())
4142 }
4143
4144 fn process_payload(
4146 &mut self,
4147 now: Instant,
4148 remote: SocketAddr,
4149 path_id: PathId,
4150 number: u64,
4151 packet: Packet,
4152 #[allow(unused)] qlog: &mut QlogRecvPacket,
4153 ) -> Result<(), TransportError> {
4154 let payload = packet.payload.freeze();
4155 let mut is_probing_packet = true;
4156 let mut close = None;
4157 let payload_len = payload.len();
4158 let mut ack_eliciting = false;
4159 let mut migration_observed_addr = None;
4162 for result in frame::Iter::new(payload)? {
4163 let frame = result?;
4164 qlog.frame(&frame);
4165 let span = match frame {
4166 Frame::Padding => continue,
4167 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4168 };
4169
4170 self.stats.frame_rx.record(&frame);
4171 match &frame {
4174 Frame::Crypto(f) => {
4175 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4176 }
4177 Frame::Stream(f) => {
4178 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4179 }
4180 Frame::Datagram(f) => {
4181 trace!(len = f.data.len(), "got datagram frame");
4182 }
4183 f => {
4184 trace!("got frame {:?}", f);
4185 }
4186 }
4187
4188 let _guard = span.enter();
4189 if packet.header.is_0rtt() {
4190 match frame {
4191 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4192 return Err(TransportError::PROTOCOL_VIOLATION(
4193 "illegal frame type in 0-RTT",
4194 ));
4195 }
4196 _ => {
4197 if frame.is_1rtt() {
4198 return Err(TransportError::PROTOCOL_VIOLATION(
4199 "illegal frame type in 0-RTT",
4200 ));
4201 }
4202 }
4203 }
4204 }
4205 ack_eliciting |= frame.is_ack_eliciting();
4206
4207 match frame {
4209 Frame::Padding
4210 | Frame::PathChallenge(_)
4211 | Frame::PathResponse(_)
4212 | Frame::NewConnectionId(_)
4213 | Frame::ObservedAddr(_) => {}
4214 _ => {
4215 is_probing_packet = false;
4216 }
4217 }
4218
4219 match frame {
4220 Frame::Crypto(frame) => {
4221 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4222 }
4223 Frame::Stream(frame) => {
4224 if self.streams.received(frame, payload_len)?.should_transmit() {
4225 self.spaces[SpaceId::Data].pending.max_data = true;
4226 }
4227 }
4228 Frame::Ack(ack) => {
4229 self.on_ack_received(now, SpaceId::Data, ack)?;
4230 }
4231 Frame::PathAck(ack) => {
4232 span.record("path", tracing::field::debug(&ack.path_id));
4233 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4234 }
4235 Frame::Padding | Frame::Ping => {}
4236 Frame::Close(reason) => {
4237 close = Some(reason);
4238 }
4239 Frame::PathChallenge(challenge) => {
4240 let path = &mut self
4241 .path_mut(path_id)
4242 .expect("payload is processed only after the path becomes known");
4243 path.path_responses.push(number, challenge.0, remote);
4244 if remote == path.remote {
4245 match self.peer_supports_ack_frequency() {
4255 true => self.immediate_ack(path_id),
4256 false => {
4257 self.ping_path(path_id).ok();
4258 }
4259 }
4260 }
4261 }
4262 Frame::PathResponse(response) => {
4263 let path = self
4264 .paths
4265 .get_mut(&path_id)
4266 .expect("payload is processed only after the path becomes known");
4267
4268 use PathTimer::*;
4269 use paths::OnPathResponseReceived::*;
4270 match path.data.on_path_response_received(now, response.0, remote) {
4271 OnPath { was_open } => {
4272 let qlog = self.qlog.with_time(now);
4273
4274 self.timers
4275 .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4276 self.timers
4277 .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4278
4279 let next_challenge = path
4280 .data
4281 .earliest_expiring_challenge()
4282 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4283 self.timers.set_or_stop(
4284 Timer::PerPath(path_id, PathChallengeLost),
4285 next_challenge,
4286 qlog,
4287 );
4288
4289 if !was_open {
4290 self.events
4291 .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4292 if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4293 {
4294 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4295 id: path_id,
4296 addr: observed.socket_addr(),
4297 }));
4298 }
4299 }
4300 if let Some((_, ref mut prev)) = path.prev {
4301 prev.challenges_sent.clear();
4302 prev.send_new_challenge = false;
4303 }
4304 }
4305 OffPath => {
4306 debug!("Response to off-path PathChallenge!");
4307 let next_challenge = path
4308 .data
4309 .earliest_expiring_challenge()
4310 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4311 self.timers.set_or_stop(
4312 Timer::PerPath(path_id, PathChallengeLost),
4313 next_challenge,
4314 self.qlog.with_time(now),
4315 );
4316 }
4317 Invalid { expected } => {
4318 debug!(%response, from=%remote, %expected, "ignoring invalid PATH_RESPONSE")
4319 }
4320 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4321 }
4322 }
4323 Frame::MaxData(bytes) => {
4324 self.streams.received_max_data(bytes);
4325 }
4326 Frame::MaxStreamData { id, offset } => {
4327 self.streams.received_max_stream_data(id, offset)?;
4328 }
4329 Frame::MaxStreams { dir, count } => {
4330 self.streams.received_max_streams(dir, count)?;
4331 }
4332 Frame::ResetStream(frame) => {
4333 if self.streams.received_reset(frame)?.should_transmit() {
4334 self.spaces[SpaceId::Data].pending.max_data = true;
4335 }
4336 }
4337 Frame::DataBlocked { offset } => {
4338 debug!(offset, "peer claims to be blocked at connection level");
4339 }
4340 Frame::StreamDataBlocked { id, offset } => {
4341 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4342 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4343 return Err(TransportError::STREAM_STATE_ERROR(
4344 "STREAM_DATA_BLOCKED on send-only stream",
4345 ));
4346 }
4347 debug!(
4348 stream = %id,
4349 offset, "peer claims to be blocked at stream level"
4350 );
4351 }
4352 Frame::StreamsBlocked { dir, limit } => {
4353 if limit > MAX_STREAM_COUNT {
4354 return Err(TransportError::FRAME_ENCODING_ERROR(
4355 "unrepresentable stream limit",
4356 ));
4357 }
4358 debug!(
4359 "peer claims to be blocked opening more than {} {} streams",
4360 limit, dir
4361 );
4362 }
4363 Frame::StopSending(frame::StopSending { id, error_code }) => {
4364 if id.initiator() != self.side.side() {
4365 if id.dir() == Dir::Uni {
4366 debug!("got STOP_SENDING on recv-only {}", id);
4367 return Err(TransportError::STREAM_STATE_ERROR(
4368 "STOP_SENDING on recv-only stream",
4369 ));
4370 }
4371 } else if self.streams.is_local_unopened(id) {
4372 return Err(TransportError::STREAM_STATE_ERROR(
4373 "STOP_SENDING on unopened stream",
4374 ));
4375 }
4376 self.streams.received_stop_sending(id, error_code);
4377 }
4378 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4379 if let Some(ref path_id) = path_id {
4380 span.record("path", tracing::field::debug(&path_id));
4381 }
4382 let path_id = path_id.unwrap_or_default();
4383 match self.local_cid_state.get_mut(&path_id) {
4384 None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4385 Some(cid_state) => {
4386 let allow_more_cids = cid_state
4387 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4388
4389 let has_path = !self.abandoned_paths.contains(&path_id);
4393 let allow_more_cids = allow_more_cids && has_path;
4394
4395 self.endpoint_events
4396 .push_back(EndpointEventInner::RetireConnectionId(
4397 now,
4398 path_id,
4399 sequence,
4400 allow_more_cids,
4401 ));
4402 }
4403 }
4404 }
4405 Frame::NewConnectionId(frame) => {
4406 let path_id = if let Some(path_id) = frame.path_id {
4407 if !self.is_multipath_negotiated() {
4408 return Err(TransportError::PROTOCOL_VIOLATION(
4409 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4410 ));
4411 }
4412 if path_id > self.local_max_path_id {
4413 return Err(TransportError::PROTOCOL_VIOLATION(
4414 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4415 ));
4416 }
4417 path_id
4418 } else {
4419 PathId::ZERO
4420 };
4421
4422 if self.abandoned_paths.contains(&path_id) {
4423 trace!("ignoring issued CID for abandoned path");
4424 continue;
4425 }
4426 if let Some(ref path_id) = frame.path_id {
4427 span.record("path", tracing::field::debug(&path_id));
4428 }
4429 let rem_cids = self
4430 .rem_cids
4431 .entry(path_id)
4432 .or_insert_with(|| CidQueue::new(frame.id));
4433 if rem_cids.active().is_empty() {
4434 return Err(TransportError::PROTOCOL_VIOLATION(
4436 "NEW_CONNECTION_ID when CIDs aren't in use",
4437 ));
4438 }
4439 if frame.retire_prior_to > frame.sequence {
4440 return Err(TransportError::PROTOCOL_VIOLATION(
4441 "NEW_CONNECTION_ID retiring unissued CIDs",
4442 ));
4443 }
4444
4445 use crate::cid_queue::InsertError;
4446 match rem_cids.insert(frame) {
4447 Ok(None) => {}
4448 Ok(Some((retired, reset_token))) => {
4449 let pending_retired =
4450 &mut self.spaces[SpaceId::Data].pending.retire_cids;
4451 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4454 if (pending_retired.len() as u64)
4457 .saturating_add(retired.end.saturating_sub(retired.start))
4458 > MAX_PENDING_RETIRED_CIDS
4459 {
4460 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4461 "queued too many retired CIDs",
4462 ));
4463 }
4464 pending_retired.extend(retired.map(|seq| (path_id, seq)));
4465 self.set_reset_token(path_id, remote, reset_token);
4466 }
4467 Err(InsertError::ExceedsLimit) => {
4468 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4469 }
4470 Err(InsertError::Retired) => {
4471 trace!("discarding already-retired");
4472 self.spaces[SpaceId::Data]
4476 .pending
4477 .retire_cids
4478 .push((path_id, frame.sequence));
4479 continue;
4480 }
4481 };
4482
4483 if self.side.is_server()
4484 && path_id == PathId::ZERO
4485 && self
4486 .rem_cids
4487 .get(&PathId::ZERO)
4488 .map(|cids| cids.active_seq() == 0)
4489 .unwrap_or_default()
4490 {
4491 self.update_rem_cid(PathId::ZERO);
4494 }
4495 }
4496 Frame::NewToken(NewToken { token }) => {
4497 let ConnectionSide::Client {
4498 token_store,
4499 server_name,
4500 ..
4501 } = &self.side
4502 else {
4503 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4504 };
4505 if token.is_empty() {
4506 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4507 }
4508 trace!("got new token");
4509 token_store.insert(server_name, token);
4510 }
4511 Frame::Datagram(datagram) => {
4512 if self
4513 .datagrams
4514 .received(datagram, &self.config.datagram_receive_buffer_size)?
4515 {
4516 self.events.push_back(Event::DatagramReceived);
4517 }
4518 }
4519 Frame::AckFrequency(ack_frequency) => {
4520 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4523 continue;
4526 }
4527
4528 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4530 space.pending_acks.set_ack_frequency_params(&ack_frequency);
4531
4532 if let Some(timeout) = space
4535 .pending_acks
4536 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4537 {
4538 self.timers.set(
4539 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4540 timeout,
4541 self.qlog.with_time(now),
4542 );
4543 }
4544 }
4545 }
4546 Frame::ImmediateAck => {
4547 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4549 pns.pending_acks.set_immediate_ack_required();
4550 }
4551 }
4552 Frame::HandshakeDone => {
4553 if self.side.is_server() {
4554 return Err(TransportError::PROTOCOL_VIOLATION(
4555 "client sent HANDSHAKE_DONE",
4556 ));
4557 }
4558 if self.spaces[SpaceId::Handshake].crypto.is_some() {
4559 self.discard_space(now, SpaceId::Handshake);
4560 }
4561 self.events.push_back(Event::HandshakeConfirmed);
4562 trace!("handshake confirmed");
4563 }
4564 Frame::ObservedAddr(observed) => {
4565 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4567 if !self
4568 .peer_params
4569 .address_discovery_role
4570 .should_report(&self.config.address_discovery_role)
4571 {
4572 return Err(TransportError::PROTOCOL_VIOLATION(
4573 "received OBSERVED_ADDRESS frame when not negotiated",
4574 ));
4575 }
4576 if packet.header.space() != SpaceId::Data {
4578 return Err(TransportError::PROTOCOL_VIOLATION(
4579 "OBSERVED_ADDRESS frame outside data space",
4580 ));
4581 }
4582
4583 let path = self.path_data_mut(path_id);
4584 if remote == path.remote {
4585 if let Some(updated) = path.update_observed_addr_report(observed) {
4586 if path.open {
4587 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4588 id: path_id,
4589 addr: updated,
4590 }));
4591 }
4592 }
4594 } else {
4595 migration_observed_addr = Some(observed)
4597 }
4598 }
4599 Frame::PathAbandon(frame::PathAbandon {
4600 path_id,
4601 error_code,
4602 }) => {
4603 span.record("path", tracing::field::debug(&path_id));
4604 let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4606 Ok(()) => {
4607 trace!("peer abandoned path");
4608 false
4609 }
4610 Err(ClosePathError::LastOpenPath) => {
4611 trace!("peer abandoned last path, closing connection");
4612 return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4614 }
4615 Err(ClosePathError::ClosedPath) => {
4616 trace!("peer abandoned already closed path");
4617 true
4618 }
4619 };
4620 if self.path(path_id).is_some() && !already_abandoned {
4625 let delay = self.pto(SpaceId::Data, path_id) * 3;
4630 self.timers.set(
4631 Timer::PerPath(path_id, PathTimer::DiscardPath),
4632 now + delay,
4633 self.qlog.with_time(now),
4634 );
4635 }
4636 }
4637 Frame::PathStatusAvailable(info) => {
4638 span.record("path", tracing::field::debug(&info.path_id));
4639 if self.is_multipath_negotiated() {
4640 self.on_path_status(
4641 info.path_id,
4642 PathStatus::Available,
4643 info.status_seq_no,
4644 );
4645 } else {
4646 return Err(TransportError::PROTOCOL_VIOLATION(
4647 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4648 ));
4649 }
4650 }
4651 Frame::PathStatusBackup(info) => {
4652 span.record("path", tracing::field::debug(&info.path_id));
4653 if self.is_multipath_negotiated() {
4654 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4655 } else {
4656 return Err(TransportError::PROTOCOL_VIOLATION(
4657 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4658 ));
4659 }
4660 }
4661 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4662 span.record("path", tracing::field::debug(&path_id));
4663 if !self.is_multipath_negotiated() {
4664 return Err(TransportError::PROTOCOL_VIOLATION(
4665 "received MAX_PATH_ID frame when multipath was not negotiated",
4666 ));
4667 }
4668 if path_id > self.remote_max_path_id {
4670 self.remote_max_path_id = path_id;
4671 self.issue_first_path_cids(now);
4672 }
4673 }
4674 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4675 if self.is_multipath_negotiated() {
4679 if self.local_max_path_id > max_path_id {
4680 return Err(TransportError::PROTOCOL_VIOLATION(
4681 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4682 ));
4683 }
4684 debug!("received PATHS_BLOCKED({:?})", max_path_id);
4685 } else {
4687 return Err(TransportError::PROTOCOL_VIOLATION(
4688 "received PATHS_BLOCKED frame when not multipath was not negotiated",
4689 ));
4690 }
4691 }
4692 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4693 if self.is_multipath_negotiated() {
4701 if path_id > self.local_max_path_id {
4702 return Err(TransportError::PROTOCOL_VIOLATION(
4703 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4704 ));
4705 }
4706 if next_seq.0
4707 > self
4708 .local_cid_state
4709 .get(&path_id)
4710 .map(|cid_state| cid_state.active_seq().1 + 1)
4711 .unwrap_or_default()
4712 {
4713 return Err(TransportError::PROTOCOL_VIOLATION(
4714 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4715 ));
4716 }
4717 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4718 } else {
4719 return Err(TransportError::PROTOCOL_VIOLATION(
4720 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4721 ));
4722 }
4723 }
4724 Frame::AddAddress(addr) => {
4725 let client_state = match self.iroh_hp.client_side_mut() {
4726 Ok(state) => state,
4727 Err(err) => {
4728 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4729 "Nat traversal(ADD_ADDRESS): {err}"
4730 )));
4731 }
4732 };
4733
4734 if !client_state.check_remote_address(&addr) {
4735 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4737 }
4738
4739 match client_state.add_remote_address(addr) {
4740 Ok(maybe_added) => {
4741 if let Some(added) = maybe_added {
4742 self.events.push_back(Event::NatTraversal(
4743 iroh_hp::Event::AddressAdded(added),
4744 ));
4745 }
4746 }
4747 Err(e) => {
4748 warn!(%e, "failed to add remote address")
4749 }
4750 }
4751 }
4752 Frame::RemoveAddress(addr) => {
4753 let client_state = match self.iroh_hp.client_side_mut() {
4754 Ok(state) => state,
4755 Err(err) => {
4756 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4757 "Nat traversal(REMOVE_ADDRESS): {err}"
4758 )));
4759 }
4760 };
4761 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4762 self.events
4763 .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4764 removed_addr,
4765 )));
4766 }
4767 }
4768 Frame::ReachOut(reach_out) => {
4769 let server_state = match self.iroh_hp.server_side_mut() {
4770 Ok(state) => state,
4771 Err(err) => {
4772 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4773 "Nat traversal(REACH_OUT): {err}"
4774 )));
4775 }
4776 };
4777
4778 if let Err(err) = server_state.handle_reach_out(reach_out) {
4779 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4780 "Nat traversal(REACH_OUT): {err}"
4781 )));
4782 }
4783 }
4784 }
4785 }
4786
4787 let space = self.spaces[SpaceId::Data].for_path(path_id);
4788 if space
4789 .pending_acks
4790 .packet_received(now, number, ack_eliciting, &space.dedup)
4791 {
4792 if self.abandoned_paths.contains(&path_id) {
4793 space.pending_acks.set_immediate_ack_required();
4796 } else {
4797 self.timers.set(
4798 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4799 now + self.ack_frequency.max_ack_delay,
4800 self.qlog.with_time(now),
4801 );
4802 }
4803 }
4804
4805 let pending = &mut self.spaces[SpaceId::Data].pending;
4810 self.streams.queue_max_stream_id(pending);
4811
4812 if let Some(reason) = close {
4813 self.state.move_to_draining(Some(reason.into()));
4814 self.close = true;
4815 }
4816
4817 if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4818 && !is_probing_packet
4819 && remote != self.path_data(path_id).remote
4820 {
4821 let ConnectionSide::Server { ref server_config } = self.side else {
4822 panic!("packets from unknown remote should be dropped by clients");
4823 };
4824 debug_assert!(
4825 server_config.migration,
4826 "migration-initiating packets should have been dropped immediately"
4827 );
4828 self.migrate(path_id, now, remote, migration_observed_addr);
4829 self.update_rem_cid(path_id);
4831 self.spin = false;
4832 }
4833
4834 Ok(())
4835 }
4836
4837 fn migrate(
4838 &mut self,
4839 path_id: PathId,
4840 now: Instant,
4841 remote: SocketAddr,
4842 observed_addr: Option<ObservedAddr>,
4843 ) {
4844 trace!(%remote, %path_id, "migration initiated");
4845 self.path_counter = self.path_counter.wrapping_add(1);
4846 let prev_pto = self.pto(SpaceId::Data, path_id);
4853 let known_path = self.paths.get_mut(&path_id).expect("known path");
4854 let path = &mut known_path.data;
4855 let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() {
4856 PathData::from_previous(remote, path, self.path_counter, now)
4857 } else {
4858 let peer_max_udp_payload_size =
4859 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4860 .unwrap_or(u16::MAX);
4861 PathData::new(
4862 remote,
4863 self.allow_mtud,
4864 Some(peer_max_udp_payload_size),
4865 self.path_counter,
4866 now,
4867 &self.config,
4868 )
4869 };
4870 new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4871 if let Some(report) = observed_addr {
4872 if let Some(updated) = new_path.update_observed_addr_report(report) {
4873 tracing::info!("adding observed addr event from migration");
4874 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4875 id: path_id,
4876 addr: updated,
4877 }));
4878 }
4879 }
4880 new_path.send_new_challenge = true;
4881
4882 let mut prev = mem::replace(path, new_path);
4883 if !prev.is_validating_path() {
4885 prev.send_new_challenge = true;
4886 known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4890 }
4891
4892 self.qlog.emit_tuple_assigned(path_id, remote, now);
4894
4895 self.timers.set(
4896 Timer::PerPath(path_id, PathTimer::PathValidation),
4897 now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4898 self.qlog.with_time(now),
4899 );
4900 }
4901
4902 pub fn local_address_changed(&mut self) {
4904 self.update_rem_cid(PathId::ZERO);
4906 self.ping();
4907 }
4908
4909 fn update_rem_cid(&mut self, path_id: PathId) {
4911 let Some((reset_token, retired)) =
4912 self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4913 else {
4914 return;
4915 };
4916
4917 self.spaces[SpaceId::Data]
4919 .pending
4920 .retire_cids
4921 .extend(retired.map(|seq| (path_id, seq)));
4922 let remote = self.path_data(path_id).remote;
4923 self.set_reset_token(path_id, remote, reset_token);
4924 }
4925
4926 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4935 self.endpoint_events
4936 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4937
4938 if path_id == PathId::ZERO {
4944 self.peer_params.stateless_reset_token = Some(reset_token);
4945 }
4946 }
4947
4948 fn issue_first_cids(&mut self, now: Instant) {
4950 if self
4951 .local_cid_state
4952 .get(&PathId::ZERO)
4953 .expect("PathId::ZERO exists when the connection is created")
4954 .cid_len()
4955 == 0
4956 {
4957 return;
4958 }
4959
4960 let mut n = self.peer_params.issue_cids_limit() - 1;
4962 if let ConnectionSide::Server { server_config } = &self.side {
4963 if server_config.has_preferred_address() {
4964 n -= 1;
4966 }
4967 }
4968 self.endpoint_events
4969 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
4970 }
4971
4972 fn issue_first_path_cids(&mut self, now: Instant) {
4976 if let Some(max_path_id) = self.max_path_id() {
4977 let mut path_id = self.max_path_id_with_cids.next();
4978 while path_id <= max_path_id {
4979 self.endpoint_events
4980 .push_back(EndpointEventInner::NeedIdentifiers(
4981 path_id,
4982 now,
4983 self.peer_params.issue_cids_limit(),
4984 ));
4985 path_id = path_id.next();
4986 }
4987 self.max_path_id_with_cids = max_path_id;
4988 }
4989 }
4990
4991 fn populate_packet(
4999 &mut self,
5000 now: Instant,
5001 space_id: SpaceId,
5002 path_id: PathId,
5003 path_exclusive_only: bool,
5004 buf: &mut impl BufMut,
5005 pn: u64,
5006 #[allow(unused)] qlog: &mut QlogSentPacket,
5007 ) -> SentFrames {
5008 let mut sent = SentFrames::default();
5009 let is_multipath_negotiated = self.is_multipath_negotiated();
5010 let space = &mut self.spaces[space_id];
5011 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5012 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5013 space
5014 .for_path(path_id)
5015 .pending_acks
5016 .maybe_ack_non_eliciting();
5017
5018 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5020 trace!("HANDSHAKE_DONE");
5021 buf.write(frame::FrameType::HANDSHAKE_DONE);
5022 qlog.frame(&Frame::HandshakeDone);
5023 sent.retransmits.get_or_create().handshake_done = true;
5024 self.stats.frame_tx.handshake_done =
5026 self.stats.frame_tx.handshake_done.saturating_add(1);
5027 }
5028
5029 if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5032 while let Some(local_addr) = addresses.pop() {
5033 let reach_out = frame::ReachOut::new(*round, local_addr);
5034 if buf.remaining_mut() > reach_out.size() {
5035 trace!(%round, ?local_addr, "REACH_OUT");
5036 reach_out.write(buf);
5037 let sent_reachouts = sent
5038 .retransmits
5039 .get_or_create()
5040 .reach_out
5041 .get_or_insert_with(|| (*round, Default::default()));
5042 sent_reachouts.1.push(local_addr);
5043 self.stats.frame_tx.reach_out = self.stats.frame_tx.reach_out.saturating_add(1);
5044 qlog.frame(&Frame::ReachOut(reach_out));
5045 } else {
5046 addresses.push(local_addr);
5047 break;
5048 }
5049 }
5050 if addresses.is_empty() {
5051 space.pending.reach_out = None;
5052 }
5053 }
5054
5055 if !path_exclusive_only
5057 && space_id == SpaceId::Data
5058 && self
5059 .config
5060 .address_discovery_role
5061 .should_report(&self.peer_params.address_discovery_role)
5062 && (!path.observed_addr_sent || space.pending.observed_addr)
5063 {
5064 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5065 if buf.remaining_mut() > frame.size() {
5066 trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5067 frame.write(buf);
5068
5069 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5070 path.observed_addr_sent = true;
5071
5072 self.stats.frame_tx.observed_addr += 1;
5073 sent.retransmits.get_or_create().observed_addr = true;
5074 space.pending.observed_addr = false;
5075 qlog.frame(&Frame::ObservedAddr(frame));
5076 }
5077 }
5078
5079 if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5081 trace!("PING");
5082 buf.write(frame::FrameType::PING);
5083 sent.non_retransmits = true;
5084 self.stats.frame_tx.ping += 1;
5085 qlog.frame(&Frame::Ping);
5086 }
5087
5088 if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5090 debug_assert_eq!(
5091 space_id,
5092 SpaceId::Data,
5093 "immediate acks must be sent in the data space"
5094 );
5095 trace!("IMMEDIATE_ACK");
5096 buf.write(frame::FrameType::IMMEDIATE_ACK);
5097 sent.non_retransmits = true;
5098 self.stats.frame_tx.immediate_ack += 1;
5099 qlog.frame(&Frame::ImmediateAck);
5100 }
5101
5102 if !path_exclusive_only {
5106 for path_id in space
5107 .number_spaces
5108 .iter_mut()
5109 .filter(|(_, pns)| pns.pending_acks.can_send())
5110 .map(|(&path_id, _)| path_id)
5111 .collect::<Vec<_>>()
5112 {
5113 Self::populate_acks(
5114 now,
5115 self.receiving_ecn,
5116 &mut sent,
5117 path_id,
5118 space_id,
5119 space,
5120 is_multipath_negotiated,
5121 buf,
5122 &mut self.stats,
5123 qlog,
5124 );
5125 }
5126 }
5127
5128 if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5130 let sequence_number = self.ack_frequency.next_sequence_number();
5131
5132 let config = self.config.ack_frequency_config.as_ref().unwrap();
5134
5135 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5137 path.rtt.get(),
5138 config,
5139 &self.peer_params,
5140 );
5141
5142 trace!(?max_ack_delay, "ACK_FREQUENCY");
5143
5144 let frame = frame::AckFrequency {
5145 sequence: sequence_number,
5146 ack_eliciting_threshold: config.ack_eliciting_threshold,
5147 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5148 reordering_threshold: config.reordering_threshold,
5149 };
5150 frame.encode(buf);
5151 qlog.frame(&Frame::AckFrequency(frame));
5152
5153 sent.retransmits.get_or_create().ack_frequency = true;
5154
5155 self.ack_frequency
5156 .ack_frequency_sent(path_id, pn, max_ack_delay);
5157 self.stats.frame_tx.ack_frequency += 1;
5158 }
5159
5160 if buf.remaining_mut() > frame::PathChallenge::SIZE_BOUND
5162 && space_id == SpaceId::Data
5163 && path.send_new_challenge
5164 && !self.state.is_closed()
5165 {
5167 path.send_new_challenge = false;
5168
5169 let token = self.rng.random();
5171 let info = paths::SentChallengeInfo {
5172 sent_instant: now,
5173 remote: path.remote,
5174 };
5175 path.challenges_sent.insert(token, info);
5176 sent.non_retransmits = true;
5177 sent.requires_padding = true;
5178 let challenge = frame::PathChallenge(token);
5179 trace!(%challenge, "sending new challenge");
5180 buf.write(challenge);
5181 qlog.frame(&Frame::PathChallenge(challenge));
5182 self.stats.frame_tx.path_challenge += 1;
5183 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5184 self.timers.set(
5185 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5186 now + pto,
5187 self.qlog.with_time(now),
5188 );
5189
5190 if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5191 space.pending.path_status.insert(path_id);
5193 }
5194
5195 if space_id == SpaceId::Data
5198 && self
5199 .config
5200 .address_discovery_role
5201 .should_report(&self.peer_params.address_discovery_role)
5202 {
5203 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5204 if buf.remaining_mut() > frame.size() {
5205 frame.write(buf);
5206 qlog.frame(&Frame::ObservedAddr(frame));
5207
5208 self.next_observed_addr_seq_no =
5209 self.next_observed_addr_seq_no.saturating_add(1u8);
5210 path.observed_addr_sent = true;
5211
5212 self.stats.frame_tx.observed_addr += 1;
5213 sent.retransmits.get_or_create().observed_addr = true;
5214 space.pending.observed_addr = false;
5215 }
5216 }
5217 }
5218
5219 if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data {
5221 if let Some(token) = path.path_responses.pop_on_path(path.remote) {
5222 sent.non_retransmits = true;
5223 sent.requires_padding = true;
5224 let response = frame::PathResponse(token);
5225 trace!(%response, "sending response");
5226 buf.write(response);
5227 qlog.frame(&Frame::PathResponse(response));
5228 self.stats.frame_tx.path_response += 1;
5229
5230 if space_id == SpaceId::Data
5234 && self
5235 .config
5236 .address_discovery_role
5237 .should_report(&self.peer_params.address_discovery_role)
5238 {
5239 let frame =
5240 frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5241 if buf.remaining_mut() > frame.size() {
5242 frame.write(buf);
5243 qlog.frame(&Frame::ObservedAddr(frame));
5244
5245 self.next_observed_addr_seq_no =
5246 self.next_observed_addr_seq_no.saturating_add(1u8);
5247 path.observed_addr_sent = true;
5248
5249 self.stats.frame_tx.observed_addr += 1;
5250 sent.retransmits.get_or_create().observed_addr = true;
5251 space.pending.observed_addr = false;
5252 }
5253 }
5254 }
5255 }
5256
5257 while !path_exclusive_only && buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt {
5259 let mut frame = match space.pending.crypto.pop_front() {
5260 Some(x) => x,
5261 None => break,
5262 };
5263
5264 let max_crypto_data_size = buf.remaining_mut()
5269 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5271 - 2; let len = frame
5274 .data
5275 .len()
5276 .min(2usize.pow(14) - 1)
5277 .min(max_crypto_data_size);
5278
5279 let data = frame.data.split_to(len);
5280 let truncated = frame::Crypto {
5281 offset: frame.offset,
5282 data,
5283 };
5284 trace!(
5285 "CRYPTO: off {} len {}",
5286 truncated.offset,
5287 truncated.data.len()
5288 );
5289 truncated.encode(buf);
5290 self.stats.frame_tx.crypto += 1;
5291
5292 #[cfg(feature = "qlog")]
5294 qlog.frame(&Frame::Crypto(truncated.clone()));
5295 sent.retransmits.get_or_create().crypto.push_back(truncated);
5296 if !frame.data.is_empty() {
5297 frame.offset += len as u64;
5298 space.pending.crypto.push_front(frame);
5299 }
5300 }
5301
5302 while !path_exclusive_only
5305 && space_id == SpaceId::Data
5306 && frame::PathAbandon::SIZE_BOUND <= buf.remaining_mut()
5307 {
5308 let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5309 break;
5310 };
5311 let frame = frame::PathAbandon {
5312 path_id,
5313 error_code,
5314 };
5315 frame.encode(buf);
5316 qlog.frame(&Frame::PathAbandon(frame));
5317 self.stats.frame_tx.path_abandon += 1;
5318 trace!(%path_id, "PATH_ABANDON");
5319 sent.retransmits
5320 .get_or_create()
5321 .path_abandon
5322 .entry(path_id)
5323 .or_insert(error_code);
5324 }
5325
5326 while !path_exclusive_only
5328 && space_id == SpaceId::Data
5329 && frame::PathStatusAvailable::SIZE_BOUND <= buf.remaining_mut()
5330 {
5331 let Some(path_id) = space.pending.path_status.pop_first() else {
5332 break;
5333 };
5334 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5335 trace!(%path_id, "discarding queued path status for unknown path");
5336 continue;
5337 };
5338
5339 let seq = path.status.seq();
5340 sent.retransmits.get_or_create().path_status.insert(path_id);
5341 match path.local_status() {
5342 PathStatus::Available => {
5343 let frame = frame::PathStatusAvailable {
5344 path_id,
5345 status_seq_no: seq,
5346 };
5347 frame.encode(buf);
5348 qlog.frame(&Frame::PathStatusAvailable(frame));
5349 self.stats.frame_tx.path_status_available += 1;
5350 trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5351 }
5352 PathStatus::Backup => {
5353 let frame = frame::PathStatusBackup {
5354 path_id,
5355 status_seq_no: seq,
5356 };
5357 frame.encode(buf);
5358 qlog.frame(&Frame::PathStatusBackup(frame));
5359 self.stats.frame_tx.path_status_backup += 1;
5360 trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5361 }
5362 }
5363 }
5364
5365 if space_id == SpaceId::Data
5367 && space.pending.max_path_id
5368 && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut()
5369 {
5370 let frame = frame::MaxPathId(self.local_max_path_id);
5371 frame.encode(buf);
5372 qlog.frame(&Frame::MaxPathId(frame));
5373 space.pending.max_path_id = false;
5374 sent.retransmits.get_or_create().max_path_id = true;
5375 trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5376 self.stats.frame_tx.max_path_id += 1;
5377 }
5378
5379 if space_id == SpaceId::Data
5381 && space.pending.paths_blocked
5382 && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut()
5383 {
5384 let frame = frame::PathsBlocked(self.remote_max_path_id);
5385 frame.encode(buf);
5386 qlog.frame(&Frame::PathsBlocked(frame));
5387 space.pending.paths_blocked = false;
5388 sent.retransmits.get_or_create().paths_blocked = true;
5389 trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED");
5390 self.stats.frame_tx.paths_blocked += 1;
5391 }
5392
5393 while space_id == SpaceId::Data && frame::PathCidsBlocked::SIZE_BOUND <= buf.remaining_mut()
5395 {
5396 let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5397 break;
5398 };
5399 let next_seq = match self.rem_cids.get(&path_id) {
5400 Some(cid_queue) => cid_queue.active_seq() + 1,
5401 None => 0,
5402 };
5403 let frame = frame::PathCidsBlocked {
5404 path_id,
5405 next_seq: VarInt(next_seq),
5406 };
5407 frame.encode(buf);
5408 qlog.frame(&Frame::PathCidsBlocked(frame));
5409 sent.retransmits
5410 .get_or_create()
5411 .path_cids_blocked
5412 .push(path_id);
5413 trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5414 self.stats.frame_tx.path_cids_blocked += 1;
5415 }
5416
5417 if space_id == SpaceId::Data {
5419 self.streams.write_control_frames(
5420 buf,
5421 &mut space.pending,
5422 &mut sent.retransmits,
5423 &mut self.stats.frame_tx,
5424 qlog,
5425 );
5426 }
5427
5428 let cid_len = self
5430 .local_cid_state
5431 .values()
5432 .map(|cid_state| cid_state.cid_len())
5433 .max()
5434 .expect("some local CID state must exist");
5435 let new_cid_size_bound =
5436 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5437 while !path_exclusive_only && buf.remaining_mut() > new_cid_size_bound {
5438 let issued = match space.pending.new_cids.pop() {
5439 Some(x) => x,
5440 None => break,
5441 };
5442 let retire_prior_to = self
5443 .local_cid_state
5444 .get(&issued.path_id)
5445 .map(|cid_state| cid_state.retire_prior_to())
5446 .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5447
5448 let cid_path_id = match is_multipath_negotiated {
5449 true => {
5450 trace!(
5451 path_id = ?issued.path_id,
5452 sequence = issued.sequence,
5453 id = %issued.id,
5454 "PATH_NEW_CONNECTION_ID",
5455 );
5456 self.stats.frame_tx.path_new_connection_id += 1;
5457 Some(issued.path_id)
5458 }
5459 false => {
5460 trace!(
5461 sequence = issued.sequence,
5462 id = %issued.id,
5463 "NEW_CONNECTION_ID"
5464 );
5465 debug_assert_eq!(issued.path_id, PathId::ZERO);
5466 self.stats.frame_tx.new_connection_id += 1;
5467 None
5468 }
5469 };
5470 let frame = frame::NewConnectionId {
5471 path_id: cid_path_id,
5472 sequence: issued.sequence,
5473 retire_prior_to,
5474 id: issued.id,
5475 reset_token: issued.reset_token,
5476 };
5477 frame.encode(buf);
5478 sent.retransmits.get_or_create().new_cids.push(issued);
5479 qlog.frame(&Frame::NewConnectionId(frame));
5480 }
5481
5482 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5484 while !path_exclusive_only && buf.remaining_mut() > retire_cid_bound {
5485 let (path_id, sequence) = match space.pending.retire_cids.pop() {
5486 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5487 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5488 self.stats.frame_tx.retire_connection_id += 1;
5489 (None, seq)
5490 }
5491 Some((path_id, seq)) => {
5492 trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5493 self.stats.frame_tx.path_retire_connection_id += 1;
5494 (Some(path_id), seq)
5495 }
5496 None => break,
5497 };
5498 let frame = frame::RetireConnectionId { path_id, sequence };
5499 frame.encode(buf);
5500 qlog.frame(&Frame::RetireConnectionId(frame));
5501 sent.retransmits
5502 .get_or_create()
5503 .retire_cids
5504 .push((path_id.unwrap_or_default(), sequence));
5505 }
5506
5507 let mut sent_datagrams = false;
5509 while !path_exclusive_only
5510 && buf.remaining_mut() > Datagram::SIZE_BOUND
5511 && space_id == SpaceId::Data
5512 {
5513 let prev_remaining = buf.remaining_mut();
5514 match self.datagrams.write(buf) {
5515 true => {
5516 sent_datagrams = true;
5517 sent.non_retransmits = true;
5518 self.stats.frame_tx.datagram += 1;
5519 qlog.frame_datagram((prev_remaining - buf.remaining_mut()) as u64);
5520 }
5521 false => break,
5522 }
5523 }
5524 if self.datagrams.send_blocked && sent_datagrams {
5525 self.events.push_back(Event::DatagramsUnblocked);
5526 self.datagrams.send_blocked = false;
5527 }
5528
5529 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5530
5531 while let Some(remote_addr) = space.pending.new_tokens.pop() {
5533 if path_exclusive_only {
5534 break;
5535 }
5536 debug_assert_eq!(space_id, SpaceId::Data);
5537 let ConnectionSide::Server { server_config } = &self.side else {
5538 panic!("NEW_TOKEN frames should not be enqueued by clients");
5539 };
5540
5541 if remote_addr != path.remote {
5542 continue;
5547 }
5548
5549 let token = Token::new(
5550 TokenPayload::Validation {
5551 ip: remote_addr.ip(),
5552 issued: server_config.time_source.now(),
5553 },
5554 &mut self.rng,
5555 );
5556 let new_token = NewToken {
5557 token: token.encode(&*server_config.token_key).into(),
5558 };
5559
5560 if buf.remaining_mut() < new_token.size() {
5561 space.pending.new_tokens.push(remote_addr);
5562 break;
5563 }
5564
5565 trace!("NEW_TOKEN");
5566 new_token.encode(buf);
5567 qlog.frame(&Frame::NewToken(new_token));
5568 sent.retransmits
5569 .get_or_create()
5570 .new_tokens
5571 .push(remote_addr);
5572 self.stats.frame_tx.new_token += 1;
5573 }
5574
5575 if !path_exclusive_only && space_id == SpaceId::Data {
5577 sent.stream_frames =
5578 self.streams
5579 .write_stream_frames(buf, self.config.send_fairness, qlog);
5580 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
5581 }
5582
5583 while space_id == SpaceId::Data && frame::AddAddress::SIZE_BOUND <= buf.remaining_mut() {
5586 if let Some(added_address) = space.pending.add_address.pop_last() {
5587 trace!(
5588 seq = %added_address.seq_no,
5589 ip = ?added_address.ip,
5590 port = added_address.port,
5591 "ADD_ADDRESS",
5592 );
5593 added_address.write(buf);
5594 sent.retransmits
5595 .get_or_create()
5596 .add_address
5597 .insert(added_address);
5598 self.stats.frame_tx.add_address = self.stats.frame_tx.add_address.saturating_add(1);
5599 qlog.frame(&Frame::AddAddress(added_address));
5600 } else {
5601 break;
5602 }
5603 }
5604
5605 while space_id == SpaceId::Data && frame::RemoveAddress::SIZE_BOUND <= buf.remaining_mut() {
5607 if let Some(removed_address) = space.pending.remove_address.pop_last() {
5608 trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5609 removed_address.write(buf);
5610 sent.retransmits
5611 .get_or_create()
5612 .remove_address
5613 .insert(removed_address);
5614 self.stats.frame_tx.remove_address =
5615 self.stats.frame_tx.remove_address.saturating_add(1);
5616 qlog.frame(&Frame::RemoveAddress(removed_address));
5617 } else {
5618 break;
5619 }
5620 }
5621
5622 sent
5623 }
5624
5625 fn populate_acks(
5627 now: Instant,
5628 receiving_ecn: bool,
5629 sent: &mut SentFrames,
5630 path_id: PathId,
5631 space_id: SpaceId,
5632 space: &mut PacketSpace,
5633 is_multipath_negotiated: bool,
5634 buf: &mut impl BufMut,
5635 stats: &mut ConnectionStats,
5636 #[allow(unused)] qlog: &mut QlogSentPacket,
5637 ) {
5638 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5640
5641 debug_assert!(
5642 is_multipath_negotiated || path_id == PathId::ZERO,
5643 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5644 );
5645 if is_multipath_negotiated {
5646 debug_assert!(
5647 space_id == SpaceId::Data || path_id == PathId::ZERO,
5648 "path acks must be sent in 1RTT space (have {space_id:?})"
5649 );
5650 }
5651
5652 let pns = space.for_path(path_id);
5653 let ranges = pns.pending_acks.ranges();
5654 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5655 let ecn = if receiving_ecn {
5656 Some(&pns.ecn_counters)
5657 } else {
5658 None
5659 };
5660 if let Some(max) = ranges.max() {
5661 sent.largest_acked.insert(path_id, max);
5662 }
5663
5664 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5665 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5667 let delay = delay_micros >> ack_delay_exp.into_inner();
5668
5669 if is_multipath_negotiated && space_id == SpaceId::Data {
5670 if !ranges.is_empty() {
5671 trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5672 frame::PathAck::encode(path_id, delay as _, ranges, ecn, buf);
5673 qlog.frame_path_ack(path_id, delay as _, ranges, ecn);
5674 stats.frame_tx.path_acks += 1;
5675 }
5676 } else {
5677 trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5678 frame::Ack::encode(delay as _, ranges, ecn, buf);
5679 stats.frame_tx.acks += 1;
5680 qlog.frame_ack(delay, ranges, ecn);
5681 }
5682 }
5683
5684 fn close_common(&mut self) {
5685 trace!("connection closed");
5686 self.timers.reset();
5687 }
5688
5689 fn calculate_end_timer(&self, now: Instant, factor: u32, space: SpaceId) -> Instant {
5690 match space {
5691 SpaceId::Initial | SpaceId::Handshake => {
5692 let duration = factor * self.pto(space, PathId::ZERO);
5693 self.path_data(PathId::ZERO).timer_offset(now, duration)
5694 }
5695 SpaceId::Data => self
5696 .paths
5697 .iter()
5698 .filter_map(|(path_id, state)| {
5699 if state.data.total_sent == 0 && state.data.total_recvd == 0 {
5700 None
5702 } else {
5703 let duration = factor * self.pto(self.highest_space, *path_id);
5704 Some(self.path_data(*path_id).timer_offset(now, duration))
5705 }
5706 })
5707 .max()
5708 .unwrap_or(now), }
5710 }
5711
5712 fn set_close_timer(&mut self, now: Instant) {
5713 let end = self.calculate_end_timer(now, 3, self.highest_space);
5717 self.timers
5718 .set(Timer::Conn(ConnTimer::Close), end, self.qlog.with_time(now));
5719 }
5720
5721 fn handle_peer_params(
5726 &mut self,
5727 params: TransportParameters,
5728 loc_cid: ConnectionId,
5729 rem_cid: ConnectionId,
5730 now: Instant,
5731 ) -> Result<(), TransportError> {
5732 if Some(self.orig_rem_cid) != params.initial_src_cid
5733 || (self.side.is_client()
5734 && (Some(self.initial_dst_cid) != params.original_dst_cid
5735 || self.retry_src_cid != params.retry_src_cid))
5736 {
5737 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5738 "CID authentication failure",
5739 ));
5740 }
5741 if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5742 return Err(TransportError::PROTOCOL_VIOLATION(
5743 "multipath must not use zero-length CIDs",
5744 ));
5745 }
5746
5747 self.set_peer_params(params);
5748 self.qlog.emit_peer_transport_params_received(self, now);
5749
5750 Ok(())
5751 }
5752
5753 fn set_peer_params(&mut self, params: TransportParameters) {
5754 self.streams.set_params(¶ms);
5755 self.idle_timeout =
5756 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5757 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5758
5759 if let Some(ref info) = params.preferred_address {
5760 self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5762 path_id: None,
5763 sequence: 1,
5764 id: info.connection_id,
5765 reset_token: info.stateless_reset_token,
5766 retire_prior_to: 0,
5767 })
5768 .expect(
5769 "preferred address CID is the first received, and hence is guaranteed to be legal",
5770 );
5771 let remote = self.path_data(PathId::ZERO).remote;
5772 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5773 }
5774 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
5775
5776 let mut multipath_enabled = None;
5777 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5778 self.config.get_initial_max_path_id(),
5779 params.initial_max_path_id,
5780 ) {
5781 self.local_max_path_id = local_max_path_id;
5783 self.remote_max_path_id = remote_max_path_id;
5784 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5785 debug!(%initial_max_path_id, "multipath negotiated");
5786 multipath_enabled = Some(initial_max_path_id);
5787 }
5788
5789 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5790 self.config
5791 .max_remote_nat_traversal_addresses
5792 .zip(params.max_remote_nat_traversal_addresses)
5793 {
5794 if let Some(max_initial_paths) =
5795 multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5796 {
5797 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5798 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5799 self.iroh_hp =
5800 iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5801 debug!(
5802 %max_remote_addresses, %max_local_addresses,
5803 "iroh hole punching negotiated"
5804 );
5805
5806 match self.side() {
5807 Side::Client => {
5808 if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5809 warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5812 } else if max_local_addresses as u64
5813 > params.active_connection_id_limit.into_inner()
5814 {
5815 warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5819 }
5820 }
5821 Side::Server => {
5822 if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5823 warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5824 }
5825 }
5826 }
5827 } else {
5828 debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5829 }
5830 }
5831
5832 self.peer_params = params;
5833 let peer_max_udp_payload_size =
5834 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5835 self.path_data_mut(PathId::ZERO)
5836 .mtud
5837 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5838 }
5839
5840 fn decrypt_packet(
5842 &mut self,
5843 now: Instant,
5844 path_id: PathId,
5845 packet: &mut Packet,
5846 ) -> Result<Option<u64>, Option<TransportError>> {
5847 let result = packet_crypto::decrypt_packet_body(
5848 packet,
5849 path_id,
5850 &self.spaces,
5851 self.zero_rtt_crypto.as_ref(),
5852 self.key_phase,
5853 self.prev_crypto.as_ref(),
5854 self.next_crypto.as_ref(),
5855 )?;
5856
5857 let result = match result {
5858 Some(r) => r,
5859 None => return Ok(None),
5860 };
5861
5862 if result.outgoing_key_update_acked {
5863 if let Some(prev) = self.prev_crypto.as_mut() {
5864 prev.end_packet = Some((result.number, now));
5865 self.set_key_discard_timer(now, packet.header.space());
5866 }
5867 }
5868
5869 if result.incoming_key_update {
5870 trace!("key update authenticated");
5871 self.update_keys(Some((result.number, now)), true);
5872 self.set_key_discard_timer(now, packet.header.space());
5873 }
5874
5875 Ok(Some(result.number))
5876 }
5877
5878 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5879 trace!("executing key update");
5880 let new = self
5884 .crypto
5885 .next_1rtt_keys()
5886 .expect("only called for `Data` packets");
5887 self.key_phase_size = new
5888 .local
5889 .confidentiality_limit()
5890 .saturating_sub(KEY_UPDATE_MARGIN);
5891 let old = mem::replace(
5892 &mut self.spaces[SpaceId::Data]
5893 .crypto
5894 .as_mut()
5895 .unwrap() .packet,
5897 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5898 );
5899 self.spaces[SpaceId::Data]
5900 .iter_paths_mut()
5901 .for_each(|s| s.sent_with_keys = 0);
5902 self.prev_crypto = Some(PrevCrypto {
5903 crypto: old,
5904 end_packet,
5905 update_unacked: remote,
5906 });
5907 self.key_phase = !self.key_phase;
5908 }
5909
5910 fn peer_supports_ack_frequency(&self) -> bool {
5911 self.peer_params.min_ack_delay.is_some()
5912 }
5913
5914 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5919 debug_assert_eq!(
5920 self.highest_space,
5921 SpaceId::Data,
5922 "immediate ack must be written in the data space"
5923 );
5924 self.spaces[self.highest_space]
5925 .for_path(path_id)
5926 .immediate_ack_pending = true;
5927 }
5928
5929 #[cfg(test)]
5931 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5932 let (path_id, first_decode, remaining) = match &event.0 {
5933 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5934 path_id,
5935 first_decode,
5936 remaining,
5937 ..
5938 }) => (path_id, first_decode, remaining),
5939 _ => return None,
5940 };
5941
5942 if remaining.is_some() {
5943 panic!("Packets should never be coalesced in tests");
5944 }
5945
5946 let decrypted_header = packet_crypto::unprotect_header(
5947 first_decode.clone(),
5948 &self.spaces,
5949 self.zero_rtt_crypto.as_ref(),
5950 self.peer_params.stateless_reset_token,
5951 )?;
5952
5953 let mut packet = decrypted_header.packet?;
5954 packet_crypto::decrypt_packet_body(
5955 &mut packet,
5956 *path_id,
5957 &self.spaces,
5958 self.zero_rtt_crypto.as_ref(),
5959 self.key_phase,
5960 self.prev_crypto.as_ref(),
5961 self.next_crypto.as_ref(),
5962 )
5963 .ok()?;
5964
5965 Some(packet.payload.to_vec())
5966 }
5967
5968 #[cfg(test)]
5971 pub(crate) fn bytes_in_flight(&self) -> u64 {
5972 self.path_data(PathId::ZERO).in_flight.bytes
5974 }
5975
5976 #[cfg(test)]
5978 pub(crate) fn congestion_window(&self) -> u64 {
5979 let path = self.path_data(PathId::ZERO);
5980 path.congestion
5981 .window()
5982 .saturating_sub(path.in_flight.bytes)
5983 }
5984
5985 #[cfg(test)]
5987 pub(crate) fn is_idle(&self) -> bool {
5988 let current_timers = self.timers.values();
5989 current_timers
5990 .into_iter()
5991 .filter(|(timer, _)| {
5992 !matches!(
5993 timer,
5994 Timer::Conn(ConnTimer::KeepAlive)
5995 | Timer::PerPath(_, PathTimer::PathKeepAlive)
5996 | Timer::Conn(ConnTimer::PushNewCid)
5997 | Timer::Conn(ConnTimer::KeyDiscard)
5998 )
5999 })
6000 .min_by_key(|(_, time)| *time)
6001 .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6002 }
6003
6004 #[cfg(test)]
6006 pub(crate) fn using_ecn(&self) -> bool {
6007 self.path_data(PathId::ZERO).sending_ecn
6008 }
6009
6010 #[cfg(test)]
6012 pub(crate) fn total_recvd(&self) -> u64 {
6013 self.path_data(PathId::ZERO).total_recvd
6014 }
6015
6016 #[cfg(test)]
6017 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6018 self.local_cid_state
6019 .get(&PathId::ZERO)
6020 .unwrap()
6021 .active_seq()
6022 }
6023
6024 #[cfg(test)]
6025 #[track_caller]
6026 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6027 self.local_cid_state
6028 .get(&PathId(path_id))
6029 .unwrap()
6030 .active_seq()
6031 }
6032
6033 #[cfg(test)]
6036 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6037 let n = self
6038 .local_cid_state
6039 .get_mut(&PathId::ZERO)
6040 .unwrap()
6041 .assign_retire_seq(v);
6042 self.endpoint_events
6043 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6044 }
6045
6046 #[cfg(test)]
6048 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
6049 self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
6050 }
6051
6052 #[cfg(test)]
6054 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6055 self.path_data(path_id).current_mtu()
6056 }
6057
6058 #[cfg(test)]
6060 pub(crate) fn trigger_path_validation(&mut self) {
6061 for path in self.paths.values_mut() {
6062 path.data.send_new_challenge = true;
6063 }
6064 }
6065
6066 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6077 let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
6078 path.data.send_new_challenge
6079 || path
6080 .prev
6081 .as_ref()
6082 .is_some_and(|(_, path)| path.send_new_challenge)
6083 || !path.data.path_responses.is_empty()
6084 });
6085 let other = self.streams.can_send_stream_data()
6086 || self
6087 .datagrams
6088 .outgoing
6089 .front()
6090 .is_some_and(|x| x.size(true) <= max_size);
6091 SendableFrames {
6092 acks: false,
6093 other,
6094 close: false,
6095 path_exclusive,
6096 }
6097 }
6098
6099 fn kill(&mut self, reason: ConnectionError) {
6101 self.close_common();
6102 self.state.move_to_drained(Some(reason));
6103 self.endpoint_events.push_back(EndpointEventInner::Drained);
6104 }
6105
6106 pub fn current_mtu(&self) -> u16 {
6113 self.paths
6114 .iter()
6115 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6116 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6117 .min()
6118 .expect("There is always at least one available path")
6119 }
6120
6121 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6128 let pn_len = PacketNumber::new(
6129 pn,
6130 self.spaces[SpaceId::Data]
6131 .for_path(path)
6132 .largest_acked_packet
6133 .unwrap_or(0),
6134 )
6135 .len();
6136
6137 1 + self
6139 .rem_cids
6140 .get(&path)
6141 .map(|cids| cids.active().len())
6142 .unwrap_or(20) + pn_len
6144 + self.tag_len_1rtt()
6145 }
6146
6147 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6148 let pn_len = 4;
6149
6150 let cid_len = self
6151 .rem_cids
6152 .values()
6153 .map(|cids| cids.active().len())
6154 .max()
6155 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
6159 }
6160
6161 fn tag_len_1rtt(&self) -> usize {
6162 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6163 Some(crypto) => Some(&*crypto.packet.local),
6164 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6165 };
6166 key.map_or(16, |x| x.tag_len())
6170 }
6171
6172 fn on_path_validated(&mut self, path_id: PathId) {
6174 self.path_data_mut(path_id).validated = true;
6175 let ConnectionSide::Server { server_config } = &self.side else {
6176 return;
6177 };
6178 let remote_addr = self.path_data(path_id).remote;
6179 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6180 new_tokens.clear();
6181 for _ in 0..server_config.validation_token.sent {
6182 new_tokens.push(remote_addr);
6183 }
6184 }
6185
6186 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6188 if let Some(path) = self.paths.get_mut(&path_id) {
6189 path.data.status.remote_update(status, status_seq_no);
6190 } else {
6191 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6192 }
6193 self.events.push_back(
6194 PathEvent::RemoteStatus {
6195 id: path_id,
6196 status,
6197 }
6198 .into(),
6199 );
6200 }
6201
6202 fn max_path_id(&self) -> Option<PathId> {
6211 if self.is_multipath_negotiated() {
6212 Some(self.remote_max_path_id.min(self.local_max_path_id))
6213 } else {
6214 None
6215 }
6216 }
6217
6218 pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6220 if let Some(added) = self.iroh_hp.add_local_address(address)? {
6221 self.spaces[SpaceId::Data].pending.add_address.insert(added);
6222 };
6223 Ok(())
6224 }
6225
6226 pub fn remove_nat_traversal_address(
6230 &mut self,
6231 address: SocketAddr,
6232 ) -> Result<(), iroh_hp::Error> {
6233 if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6234 self.spaces[SpaceId::Data]
6235 .pending
6236 .remove_address
6237 .insert(removed);
6238 }
6239 Ok(())
6240 }
6241
6242 pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6244 self.iroh_hp.get_local_nat_traversal_addresses()
6245 }
6246
6247 pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6249 Ok(self
6250 .iroh_hp
6251 .client_side()?
6252 .get_remote_nat_traversal_addresses())
6253 }
6254
6255 pub fn initiate_nat_traversal_round(
6263 &mut self,
6264 now: Instant,
6265 ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6266 if self.state.is_closed() {
6267 return Err(iroh_hp::Error::Closed);
6268 }
6269
6270 let client_state = self.iroh_hp.client_side_mut()?;
6271 let iroh_hp::NatTraversalRound {
6272 new_round,
6273 reach_out_at,
6274 addresses_to_probe,
6275 prev_round_path_ids,
6276 } = client_state.initiate_nat_traversal_round()?;
6277
6278 self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6279
6280 for path_id in prev_round_path_ids {
6281 let validated = self
6284 .path(path_id)
6285 .map(|path| path.validated)
6286 .unwrap_or(false);
6287
6288 if !validated {
6289 let _ = self.close_path(
6290 now,
6291 path_id,
6292 TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6293 );
6294 }
6295 }
6296
6297 let mut err = None;
6298
6299 let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6300 let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6301 let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6302
6303 for (ip, port) in addresses_to_probe {
6304 let remote = match ip {
6306 IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6307 IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6308 IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6309 IpAddr::V6(_) => {
6310 trace!("not using IPv6 nat candidate for IPv4 socket");
6311 continue;
6312 }
6313 };
6314 match self.open_path_ensure(remote, PathStatus::Backup, now) {
6315 Ok((path_id, path_was_known)) if !path_was_known => {
6316 path_ids.push(path_id);
6317 probed_addresses.push(remote);
6318 }
6319 Ok((path_id, _)) => {
6320 trace!(%path_id, %remote,"nat traversal: path existed for remote")
6321 }
6322 Err(e) => {
6323 debug!(%remote, %e,"nat traversal: failed to probe remote");
6324 err.get_or_insert(e);
6325 }
6326 }
6327 }
6328
6329 if let Some(err) = err {
6330 if probed_addresses.is_empty() {
6332 return Err(iroh_hp::Error::Multipath(err));
6333 }
6334 }
6335
6336 self.iroh_hp
6337 .client_side_mut()
6338 .expect("connection side validated")
6339 .set_round_path_ids(path_ids);
6340
6341 Ok(probed_addresses)
6342 }
6343}
6344
6345impl fmt::Debug for Connection {
6346 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6347 f.debug_struct("Connection")
6348 .field("handshake_cid", &self.handshake_cid)
6349 .finish()
6350 }
6351}
6352
6353#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6354enum PathBlocked {
6355 No,
6356 AntiAmplification,
6357 Congestion,
6358 Pacing,
6359}
6360
6361enum ConnectionSide {
6363 Client {
6364 token: Bytes,
6366 token_store: Arc<dyn TokenStore>,
6367 server_name: String,
6368 },
6369 Server {
6370 server_config: Arc<ServerConfig>,
6371 },
6372}
6373
6374impl ConnectionSide {
6375 fn remote_may_migrate(&self, state: &State) -> bool {
6376 match self {
6377 Self::Server { server_config } => server_config.migration,
6378 Self::Client { .. } => {
6379 if let Some(hs) = state.as_handshake() {
6380 hs.allow_server_migration
6381 } else {
6382 false
6383 }
6384 }
6385 }
6386 }
6387
6388 fn is_client(&self) -> bool {
6389 self.side().is_client()
6390 }
6391
6392 fn is_server(&self) -> bool {
6393 self.side().is_server()
6394 }
6395
6396 fn side(&self) -> Side {
6397 match *self {
6398 Self::Client { .. } => Side::Client,
6399 Self::Server { .. } => Side::Server,
6400 }
6401 }
6402}
6403
6404impl From<SideArgs> for ConnectionSide {
6405 fn from(side: SideArgs) -> Self {
6406 match side {
6407 SideArgs::Client {
6408 token_store,
6409 server_name,
6410 } => Self::Client {
6411 token: token_store.take(&server_name).unwrap_or_default(),
6412 token_store,
6413 server_name,
6414 },
6415 SideArgs::Server {
6416 server_config,
6417 pref_addr_cid: _,
6418 path_validated: _,
6419 } => Self::Server { server_config },
6420 }
6421 }
6422}
6423
6424pub(crate) enum SideArgs {
6426 Client {
6427 token_store: Arc<dyn TokenStore>,
6428 server_name: String,
6429 },
6430 Server {
6431 server_config: Arc<ServerConfig>,
6432 pref_addr_cid: Option<ConnectionId>,
6433 path_validated: bool,
6434 },
6435}
6436
6437impl SideArgs {
6438 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6439 match *self {
6440 Self::Client { .. } => None,
6441 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6442 }
6443 }
6444
6445 pub(crate) fn path_validated(&self) -> bool {
6446 match *self {
6447 Self::Client { .. } => true,
6448 Self::Server { path_validated, .. } => path_validated,
6449 }
6450 }
6451
6452 pub(crate) fn side(&self) -> Side {
6453 match *self {
6454 Self::Client { .. } => Side::Client,
6455 Self::Server { .. } => Side::Server,
6456 }
6457 }
6458}
6459
6460#[derive(Debug, Error, Clone, PartialEq, Eq)]
6462pub enum ConnectionError {
6463 #[error("peer doesn't implement any supported version")]
6465 VersionMismatch,
6466 #[error(transparent)]
6468 TransportError(#[from] TransportError),
6469 #[error("aborted by peer: {0}")]
6471 ConnectionClosed(frame::ConnectionClose),
6472 #[error("closed by peer: {0}")]
6474 ApplicationClosed(frame::ApplicationClose),
6475 #[error("reset by peer")]
6477 Reset,
6478 #[error("timed out")]
6484 TimedOut,
6485 #[error("closed")]
6487 LocallyClosed,
6488 #[error("CIDs exhausted")]
6492 CidsExhausted,
6493}
6494
6495impl From<Close> for ConnectionError {
6496 fn from(x: Close) -> Self {
6497 match x {
6498 Close::Connection(reason) => Self::ConnectionClosed(reason),
6499 Close::Application(reason) => Self::ApplicationClosed(reason),
6500 }
6501 }
6502}
6503
6504impl From<ConnectionError> for io::Error {
6506 fn from(x: ConnectionError) -> Self {
6507 use ConnectionError::*;
6508 let kind = match x {
6509 TimedOut => io::ErrorKind::TimedOut,
6510 Reset => io::ErrorKind::ConnectionReset,
6511 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6512 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6513 io::ErrorKind::Other
6514 }
6515 };
6516 Self::new(kind, x)
6517 }
6518}
6519
6520#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6523pub enum PathError {
6524 #[error("multipath extension not negotiated")]
6526 MultipathNotNegotiated,
6527 #[error("the server side may not open a path")]
6529 ServerSideNotAllowed,
6530 #[error("maximum number of concurrent paths reached")]
6532 MaxPathIdReached,
6533 #[error("remoted CIDs exhausted")]
6535 RemoteCidsExhausted,
6536 #[error("path validation failed")]
6538 ValidationFailed,
6539 #[error("invalid remote address")]
6541 InvalidRemoteAddress(SocketAddr),
6542}
6543
6544#[derive(Debug, Error, Clone, Eq, PartialEq)]
6546pub enum ClosePathError {
6547 #[error("closed path")]
6549 ClosedPath,
6550 #[error("last open path")]
6552 LastOpenPath,
6553}
6554
6555#[derive(Debug, Error, Clone, Copy)]
6556#[error("Multipath extension not negotiated")]
6557pub struct MultipathNotNegotiated {
6558 _private: (),
6559}
6560
6561#[derive(Debug)]
6563pub enum Event {
6564 HandshakeDataReady,
6566 Connected,
6568 HandshakeConfirmed,
6570 ConnectionLost {
6574 reason: ConnectionError,
6576 },
6577 Stream(StreamEvent),
6579 DatagramReceived,
6581 DatagramsUnblocked,
6583 Path(PathEvent),
6585 NatTraversal(iroh_hp::Event),
6587}
6588
6589impl From<PathEvent> for Event {
6590 fn from(source: PathEvent) -> Self {
6591 Self::Path(source)
6592 }
6593}
6594
6595fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6596 Duration::from_micros(params.max_ack_delay.0 * 1000)
6597}
6598
6599const MAX_BACKOFF_EXPONENT: u32 = 16;
6601
6602const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6610
6611const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6617 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6618
6619const KEY_UPDATE_MARGIN: u64 = 10_000;
6623
6624#[derive(Default)]
6625struct SentFrames {
6626 retransmits: ThinRetransmits,
6627 largest_acked: FxHashMap<PathId, u64>,
6629 stream_frames: StreamMetaVec,
6630 non_retransmits: bool,
6632 requires_padding: bool,
6634}
6635
6636impl SentFrames {
6637 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6639 !self.largest_acked.is_empty()
6640 && !self.non_retransmits
6641 && self.stream_frames.is_empty()
6642 && self.retransmits.is_empty(streams)
6643 }
6644}
6645
6646fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6654 match (x, y) {
6655 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6656 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6657 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6658 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6659 }
6660}
6661
6662#[cfg(test)]
6663mod tests {
6664 use super::*;
6665
6666 #[test]
6667 fn negotiate_max_idle_timeout_commutative() {
6668 let test_params = [
6669 (None, None, None),
6670 (None, Some(VarInt(0)), None),
6671 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6672 (Some(VarInt(0)), Some(VarInt(0)), None),
6673 (
6674 Some(VarInt(2)),
6675 Some(VarInt(0)),
6676 Some(Duration::from_millis(2)),
6677 ),
6678 (
6679 Some(VarInt(1)),
6680 Some(VarInt(4)),
6681 Some(Duration::from_millis(1)),
6682 ),
6683 ];
6684
6685 for (left, right, result) in test_params {
6686 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6687 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6688 }
6689 }
6690}