1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 num::NonZeroU32,
8 ops::Not,
9 sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
22 MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
23 TransportErrorCode, VarInt,
24 cid_generator::ConnectionIdGenerator,
25 cid_queue::CidQueue,
26 coding::BufMutExt,
27 config::{ServerConfig, TransportConfig},
28 congestion::Controller,
29 connection::{
30 qlog::{QlogRecvPacket, QlogSentPacket, QlogSink},
31 spaces::LostPacket,
32 timer::{ConnTimer, PathTimer},
33 },
34 crypto::{self, KeyPair, Keys, PacketKey},
35 frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
36 iroh_hp,
37 packet::{
38 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
39 PacketNumber, PartialDecode, SpaceId,
40 },
41 range_set::ArrayRangeSet,
42 shared::{
43 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
44 EndpointEvent, EndpointEventInner,
45 },
46 token::{ResetToken, Token, TokenPayload},
47 transport_parameters::TransportParameters,
48};
49
50mod ack_frequency;
51use ack_frequency::AckFrequencyState;
52
53mod assembler;
54pub use assembler::Chunk;
55
56mod cid_state;
57use cid_state::CidState;
58
59mod datagrams;
60use datagrams::DatagramState;
61pub use datagrams::{Datagrams, SendDatagramError};
62
63mod mtud;
64mod pacing;
65
66mod packet_builder;
67use packet_builder::{PacketBuilder, PadDatagram};
68
69mod packet_crypto;
70use packet_crypto::{PrevCrypto, ZeroRttCrypto};
71
72mod paths;
73pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
74use paths::{PathData, PathState};
75
76pub(crate) mod qlog;
77
78mod send_buffer;
79
80mod spaces;
81#[cfg(fuzzing)]
82pub use spaces::Retransmits;
83#[cfg(not(fuzzing))]
84use spaces::Retransmits;
85use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
86
87mod stats;
88pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
89
90mod streams;
91#[cfg(fuzzing)]
92pub use streams::StreamsState;
93#[cfg(not(fuzzing))]
94use streams::StreamsState;
95pub use streams::{
96 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
97 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
98};
99
100mod timer;
101use timer::{Timer, TimerTable};
102
103mod transmit_buf;
104use transmit_buf::TransmitBuf;
105
106mod state;
107
108#[cfg(not(fuzzing))]
109use state::State;
110#[cfg(fuzzing)]
111pub use state::State;
112use state::StateType;
113
114pub struct Connection {
154 endpoint_config: Arc<EndpointConfig>,
155 config: Arc<TransportConfig>,
156 rng: StdRng,
157 crypto: Box<dyn crypto::Session>,
158 handshake_cid: ConnectionId,
160 rem_handshake_cid: ConnectionId,
162 local_ip: Option<IpAddr>,
165 paths: BTreeMap<PathId, PathState>,
171 path_counter: u64,
175 allow_mtud: bool,
177 state: State,
178 side: ConnectionSide,
179 zero_rtt_enabled: bool,
181 zero_rtt_crypto: Option<ZeroRttCrypto>,
183 key_phase: bool,
184 key_phase_size: u64,
186 peer_params: TransportParameters,
188 orig_rem_cid: ConnectionId,
190 initial_dst_cid: ConnectionId,
192 retry_src_cid: Option<ConnectionId>,
195 events: VecDeque<Event>,
197 endpoint_events: VecDeque<EndpointEventInner>,
198 spin_enabled: bool,
200 spin: bool,
202 spaces: [PacketSpace; 3],
204 highest_space: SpaceId,
206 prev_crypto: Option<PrevCrypto>,
208 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
213 accepted_0rtt: bool,
214 permit_idle_reset: bool,
216 idle_timeout: Option<Duration>,
218 timers: TimerTable,
219 authentication_failures: u64,
221
222 close: bool,
227
228 ack_frequency: AckFrequencyState,
232
233 receiving_ecn: bool,
238 total_authed_packets: u64,
240 app_limited: bool,
243
244 next_observed_addr_seq_no: VarInt,
249
250 streams: StreamsState,
251 rem_cids: FxHashMap<PathId, CidQueue>,
257 local_cid_state: FxHashMap<PathId, CidState>,
264 datagrams: DatagramState,
266 stats: ConnectionStats,
268 path_stats: FxHashMap<PathId, PathStats>,
270 version: u32,
272
273 max_concurrent_paths: NonZeroU32,
282 local_max_path_id: PathId,
297 remote_max_path_id: PathId,
303 max_path_id_with_cids: PathId,
309 abandoned_paths: FxHashSet<PathId>,
317
318 iroh_hp: iroh_hp::State,
319 qlog: QlogSink,
320}
321
322impl Connection {
323 pub(crate) fn new(
324 endpoint_config: Arc<EndpointConfig>,
325 config: Arc<TransportConfig>,
326 init_cid: ConnectionId,
327 loc_cid: ConnectionId,
328 rem_cid: ConnectionId,
329 remote: SocketAddr,
330 local_ip: Option<IpAddr>,
331 crypto: Box<dyn crypto::Session>,
332 cid_gen: &dyn ConnectionIdGenerator,
333 now: Instant,
334 version: u32,
335 allow_mtud: bool,
336 rng_seed: [u8; 32],
337 side_args: SideArgs,
338 qlog: QlogSink,
339 ) -> Self {
340 let pref_addr_cid = side_args.pref_addr_cid();
341 let path_validated = side_args.path_validated();
342 let connection_side = ConnectionSide::from(side_args);
343 let side = connection_side.side();
344 let mut rng = StdRng::from_seed(rng_seed);
345 let initial_space = {
346 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
347 space.crypto = Some(crypto.initial_keys(init_cid, side));
348 space
349 };
350 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
351 #[cfg(test)]
352 let data_space = match config.deterministic_packet_numbers {
353 true => PacketSpace::new_deterministic(now, SpaceId::Data),
354 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
355 };
356 #[cfg(not(test))]
357 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
358 let state = State::handshake(state::Handshake {
359 rem_cid_set: side.is_server(),
360 expected_token: Bytes::new(),
361 client_hello: None,
362 allow_server_migration: side.is_client(),
363 });
364 let local_cid_state = FxHashMap::from_iter([(
365 PathId::ZERO,
366 CidState::new(
367 cid_gen.cid_len(),
368 cid_gen.cid_lifetime(),
369 now,
370 if pref_addr_cid.is_some() { 2 } else { 1 },
371 ),
372 )]);
373
374 let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config);
375 path.open = true;
377 let mut this = Self {
378 endpoint_config,
379 crypto,
380 handshake_cid: loc_cid,
381 rem_handshake_cid: rem_cid,
382 local_cid_state,
383 paths: BTreeMap::from_iter([(
384 PathId::ZERO,
385 PathState {
386 data: path,
387 prev: None,
388 },
389 )]),
390 path_counter: 0,
391 allow_mtud,
392 local_ip,
393 state,
394 side: connection_side,
395 zero_rtt_enabled: false,
396 zero_rtt_crypto: None,
397 key_phase: false,
398 key_phase_size: rng.random_range(10..1000),
405 peer_params: TransportParameters::default(),
406 orig_rem_cid: rem_cid,
407 initial_dst_cid: init_cid,
408 retry_src_cid: None,
409 events: VecDeque::new(),
410 endpoint_events: VecDeque::new(),
411 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
412 spin: false,
413 spaces: [initial_space, handshake_space, data_space],
414 highest_space: SpaceId::Initial,
415 prev_crypto: None,
416 next_crypto: None,
417 accepted_0rtt: false,
418 permit_idle_reset: true,
419 idle_timeout: match config.max_idle_timeout {
420 None | Some(VarInt(0)) => None,
421 Some(dur) => Some(Duration::from_millis(dur.0)),
422 },
423 timers: TimerTable::default(),
424 authentication_failures: 0,
425 close: false,
426
427 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
428 &TransportParameters::default(),
429 )),
430
431 app_limited: false,
432 receiving_ecn: false,
433 total_authed_packets: 0,
434
435 next_observed_addr_seq_no: 0u32.into(),
436
437 streams: StreamsState::new(
438 side,
439 config.max_concurrent_uni_streams,
440 config.max_concurrent_bidi_streams,
441 config.send_window,
442 config.receive_window,
443 config.stream_receive_window,
444 ),
445 datagrams: DatagramState::default(),
446 config,
447 rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
448 rng,
449 stats: ConnectionStats::default(),
450 path_stats: Default::default(),
451 version,
452
453 max_concurrent_paths: NonZeroU32::MIN,
455 local_max_path_id: PathId::ZERO,
456 remote_max_path_id: PathId::ZERO,
457 max_path_id_with_cids: PathId::ZERO,
458 abandoned_paths: Default::default(),
459
460 iroh_hp: Default::default(),
462 qlog,
463 };
464 if path_validated {
465 this.on_path_validated(PathId::ZERO);
466 }
467 if side.is_client() {
468 this.write_crypto();
470 this.init_0rtt(now);
471 }
472 this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now);
473 this
474 }
475
476 #[must_use]
484 pub fn poll_timeout(&mut self) -> Option<Instant> {
485 self.timers.peek()
486 }
487
488 #[must_use]
494 pub fn poll(&mut self) -> Option<Event> {
495 if let Some(x) = self.events.pop_front() {
496 return Some(x);
497 }
498
499 if let Some(event) = self.streams.poll() {
500 return Some(Event::Stream(event));
501 }
502
503 if let Some(reason) = self.state.take_error() {
504 return Some(Event::ConnectionLost { reason });
505 }
506
507 None
508 }
509
510 #[must_use]
512 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
513 self.endpoint_events.pop_front().map(EndpointEvent)
514 }
515
516 #[must_use]
518 pub fn streams(&mut self) -> Streams<'_> {
519 Streams {
520 state: &mut self.streams,
521 conn_state: &self.state,
522 }
523 }
524
525 #[must_use]
527 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
528 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
529 RecvStream {
530 id,
531 state: &mut self.streams,
532 pending: &mut self.spaces[SpaceId::Data].pending,
533 }
534 }
535
536 #[must_use]
538 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
539 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
540 SendStream {
541 id,
542 state: &mut self.streams,
543 pending: &mut self.spaces[SpaceId::Data].pending,
544 conn_state: &self.state,
545 }
546 }
547
548 pub fn open_path_ensure(
555 &mut self,
556 remote: SocketAddr,
557 initial_status: PathStatus,
558 now: Instant,
559 rtt_hint: Option<Duration>,
560 ) -> Result<(PathId, bool), PathError> {
561 match self
562 .paths
563 .iter()
564 .find(|(_id, path)| path.data.remote == remote)
565 {
566 Some((path_id, _state)) => Ok((*path_id, true)),
567 None => self
568 .open_path(remote, initial_status, now, rtt_hint)
569 .map(|id| (id, false)),
570 }
571 }
572
573 pub fn open_path(
578 &mut self,
579 remote: SocketAddr,
580 initial_status: PathStatus,
581 now: Instant,
582 rtt_hint: Option<Duration>,
583 ) -> Result<PathId, PathError> {
584 if !self.is_multipath_negotiated() {
585 return Err(PathError::MultipathNotNegotiated);
586 }
587 if self.side().is_server() {
588 return Err(PathError::ServerSideNotAllowed);
589 }
590
591 let max_abandoned = self.abandoned_paths.iter().max().copied();
592 let max_used = self.paths.keys().last().copied();
593 let path_id = max_abandoned
594 .max(max_used)
595 .unwrap_or(PathId::ZERO)
596 .saturating_add(1u8);
597
598 if Some(path_id) > self.max_path_id() {
599 return Err(PathError::MaxPathIdReached);
600 }
601 if path_id > self.remote_max_path_id {
602 self.spaces[SpaceId::Data].pending.paths_blocked = true;
603 return Err(PathError::MaxPathIdReached);
604 }
605 if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
606 self.spaces[SpaceId::Data]
607 .pending
608 .path_cids_blocked
609 .push(path_id);
610 return Err(PathError::RemoteCidsExhausted);
611 }
612
613 let path = self.ensure_path(path_id, remote, now, None, rtt_hint);
614 path.status.local_update(initial_status);
615
616 Ok(path_id)
617 }
618
619 pub fn close_path(
625 &mut self,
626 now: Instant,
627 path_id: PathId,
628 error_code: VarInt,
629 ) -> Result<(), ClosePathError> {
630 if self.abandoned_paths.contains(&path_id)
631 || Some(path_id) > self.max_path_id()
632 || !self.paths.contains_key(&path_id)
633 {
634 return Err(ClosePathError::ClosedPath);
635 }
636 if self
637 .paths
638 .iter()
639 .any(|(id, path)| {
641 *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
642 })
643 .not()
644 {
645 return Err(ClosePathError::LastOpenPath);
646 }
647
648 self.spaces[SpaceId::Data]
650 .pending
651 .path_abandon
652 .insert(path_id, error_code.into());
653
654 let pending_space = &mut self.spaces[SpaceId::Data].pending;
656 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
657 pending_space.path_cids_blocked.retain(|&id| id != path_id);
658 pending_space.path_status.retain(|&id| id != path_id);
659
660 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
662 for sent_packet in space.sent_packets.values_mut() {
663 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
664 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
665 retransmits.path_cids_blocked.retain(|&id| id != path_id);
666 retransmits.path_status.retain(|&id| id != path_id);
667 }
668 }
669 }
670
671 self.rem_cids.remove(&path_id);
677 self.endpoint_events
678 .push_back(EndpointEventInner::RetireResetToken(path_id));
679
680 let pto = self.pto_max_path(SpaceId::Data, false);
681
682 let path = self.paths.get_mut(&path_id).expect("checked above");
683
684 path.data.last_allowed_receive = Some(now + 3 * pto);
686 self.abandoned_paths.insert(path_id);
687
688 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
689
690 self.timers.set(
695 Timer::PerPath(path_id, PathTimer::DiscardPath),
696 now + 6 * pto,
697 self.qlog.with_time(now),
698 );
699 Ok(())
700 }
701
702 #[track_caller]
706 fn path_data(&self, path_id: PathId) -> &PathData {
707 if let Some(data) = self.paths.get(&path_id) {
708 &data.data
709 } else {
710 panic!(
711 "unknown path: {path_id}, currently known paths: {:?}",
712 self.paths.keys().collect::<Vec<_>>()
713 );
714 }
715 }
716
717 fn path(&self, path_id: PathId) -> Option<&PathData> {
719 self.paths.get(&path_id).map(|path_state| &path_state.data)
720 }
721
722 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
724 self.paths
725 .get_mut(&path_id)
726 .map(|path_state| &mut path_state.data)
727 }
728
729 pub fn paths(&self) -> Vec<PathId> {
733 self.paths.keys().copied().collect()
734 }
735
736 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
738 self.path(path_id)
739 .map(PathData::local_status)
740 .ok_or(ClosedPath { _private: () })
741 }
742
743 pub fn path_remote_address(&self, path_id: PathId) -> Result<SocketAddr, ClosedPath> {
745 self.path(path_id)
746 .map(|path| path.remote)
747 .ok_or(ClosedPath { _private: () })
748 }
749
750 pub fn set_path_status(
754 &mut self,
755 path_id: PathId,
756 status: PathStatus,
757 ) -> Result<PathStatus, SetPathStatusError> {
758 if !self.is_multipath_negotiated() {
759 return Err(SetPathStatusError::MultipathNotNegotiated);
760 }
761 let path = self
762 .path_mut(path_id)
763 .ok_or(SetPathStatusError::ClosedPath)?;
764 let prev = match path.status.local_update(status) {
765 Some(prev) => {
766 self.spaces[SpaceId::Data]
767 .pending
768 .path_status
769 .insert(path_id);
770 prev
771 }
772 None => path.local_status(),
773 };
774 Ok(prev)
775 }
776
777 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
782 self.path(path_id).and_then(|path| path.remote_status())
783 }
784
785 pub fn set_path_max_idle_timeout(
791 &mut self,
792 path_id: PathId,
793 timeout: Option<Duration>,
794 ) -> Result<Option<Duration>, ClosedPath> {
795 let path = self
796 .paths
797 .get_mut(&path_id)
798 .ok_or(ClosedPath { _private: () })?;
799 Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
800 }
801
802 pub fn set_path_keep_alive_interval(
808 &mut self,
809 path_id: PathId,
810 interval: Option<Duration>,
811 ) -> Result<Option<Duration>, ClosedPath> {
812 let path = self
813 .paths
814 .get_mut(&path_id)
815 .ok_or(ClosedPath { _private: () })?;
816 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
817 }
818
819 #[track_caller]
823 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
824 &mut self.paths.get_mut(&path_id).expect("known path").data
825 }
826
827 fn ensure_path(
828 &mut self,
829 path_id: PathId,
830 remote: SocketAddr,
831 now: Instant,
832 pn: Option<u64>,
833 rtt_hint: Option<Duration>,
834 ) -> &mut PathData {
835 let vacant_entry = match self.paths.entry(path_id) {
836 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
837 btree_map::Entry::Occupied(occupied_entry) => {
838 return &mut occupied_entry.into_mut().data;
839 }
840 };
841
842 debug!(%path_id, ?remote, "path added");
845 let peer_max_udp_payload_size =
846 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
847 self.path_counter = self.path_counter.wrapping_add(1);
848 let mut data = PathData::new(
849 remote,
850 self.allow_mtud,
851 Some(peer_max_udp_payload_size),
852 self.path_counter,
853 now,
854 &self.config,
855 );
856
857 let pto = match rtt_hint {
858 Some(rtt_hint) => rtt_hint,
859 None => {
860 self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base()
863 }
864 };
865 self.timers.set(
866 Timer::PerPath(path_id, PathTimer::PathOpen),
867 now + 3 * pto,
868 self.qlog.with_time(now),
869 );
870
871 data.send_new_challenge = true;
874
875 let path = vacant_entry.insert(PathState { data, prev: None });
876
877 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
878 if let Some(pn) = pn {
879 pn_space.dedup.insert(pn);
880 }
881 self.spaces[SpaceId::Data]
882 .number_spaces
883 .insert(path_id, pn_space);
884 self.qlog.emit_tuple_assigned(path_id, remote, now);
885 &mut path.data
886 }
887
888 #[must_use]
898 pub fn poll_transmit(
899 &mut self,
900 now: Instant,
901 max_datagrams: usize,
902 buf: &mut Vec<u8>,
903 ) -> Option<Transmit> {
904 if let Some(probing) = self
905 .iroh_hp
906 .server_side_mut()
907 .ok()
908 .and_then(iroh_hp::ServerState::next_probe)
909 {
910 let destination = probing.remote();
911 trace!(%destination, "RAND_DATA packet");
912 let token: u64 = self.rng.random();
913 buf.put_u64(token);
914 probing.finish(token);
915 return Some(Transmit {
916 destination,
917 ecn: None,
918 size: 8,
919 segment_size: None,
920 src_ip: None,
921 });
922 }
923
924 assert!(max_datagrams != 0);
925 let max_datagrams = match self.config.enable_segmentation_offload {
926 false => 1,
927 true => max_datagrams,
928 };
929
930 let close = match self.state.as_type() {
949 StateType::Drained => {
950 self.app_limited = true;
951 return None;
952 }
953 StateType::Draining | StateType::Closed => {
954 if !self.close {
957 self.app_limited = true;
958 return None;
959 }
960 true
961 }
962 _ => false,
963 };
964
965 if let Some(config) = &self.config.ack_frequency_config {
967 let rtt = self
968 .paths
969 .values()
970 .map(|p| p.data.rtt.get())
971 .min()
972 .expect("one path exists");
973 self.spaces[SpaceId::Data].pending.ack_frequency = self
974 .ack_frequency
975 .should_send_ack_frequency(rtt, config, &self.peer_params)
976 && self.highest_space == SpaceId::Data
977 && self.peer_supports_ack_frequency();
978 }
979
980 let mut coalesce = true;
982
983 let mut pad_datagram = PadDatagram::No;
986
987 let mut congestion_blocked = false;
991
992 let mut last_packet_number = None;
994
995 let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
996
997 let have_available_path = self.paths.iter().any(|(id, path)| {
1000 path.data.validated
1001 && path.data.local_status() == PathStatus::Available
1002 && self.rem_cids.contains_key(id)
1003 });
1004
1005 let mut transmit = TransmitBuf::new(
1007 buf,
1008 max_datagrams,
1009 self.path_data(path_id).current_mtu().into(),
1010 );
1011 if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
1012 return Some(challenge);
1013 }
1014 let mut space_id = match path_id {
1015 PathId::ZERO => SpaceId::Initial,
1016 _ => SpaceId::Data,
1017 };
1018
1019 loop {
1020 let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1022 let err = PathError::RemoteCidsExhausted;
1023 if !self.abandoned_paths.contains(&path_id) {
1024 debug!(?err, %path_id, "no active CID for path");
1025 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1026 id: path_id,
1027 error: err,
1028 }));
1029 self.close_path(
1033 now,
1034 path_id,
1035 TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1036 )
1037 .ok();
1038 self.spaces[SpaceId::Data]
1039 .pending
1040 .path_cids_blocked
1041 .push(path_id);
1042 } else {
1043 trace!(%path_id, "remote CIDs retired for abandoned path");
1044 }
1045
1046 match self.paths.keys().find(|&&next| next > path_id) {
1047 Some(next_path_id) => {
1048 path_id = *next_path_id;
1050 space_id = SpaceId::Data;
1051
1052 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1054 if let Some(challenge) =
1055 self.send_prev_path_challenge(now, &mut transmit, path_id)
1056 {
1057 return Some(challenge);
1058 }
1059
1060 continue;
1061 }
1062 None => {
1063 trace!(
1065 ?space_id,
1066 %path_id,
1067 "no CIDs to send on path, no more paths"
1068 );
1069 break;
1070 }
1071 }
1072 };
1073
1074 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1077 transmit.datagram_remaining_mut()
1079 } else {
1080 transmit.segment_size()
1082 };
1083 let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1084 let path_should_send = {
1085 let path_exclusive_only = space_id == SpaceId::Data
1086 && have_available_path
1087 && self.path_data(path_id).local_status() == PathStatus::Backup;
1088 let path_should_send = if path_exclusive_only {
1089 can_send.path_exclusive
1090 } else {
1091 !can_send.is_empty()
1092 };
1093 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1094 path_should_send || needs_loss_probe || can_send.close
1095 };
1096
1097 if !path_should_send && space_id < SpaceId::Data {
1098 if self.spaces[space_id].crypto.is_some() {
1099 trace!(?space_id, %path_id, "nothing to send in space");
1100 }
1101 space_id = space_id.next();
1102 continue;
1103 }
1104
1105 let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1106 self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1108 } else {
1109 PathBlocked::No
1110 };
1111 if send_blocked != PathBlocked::No {
1112 trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1113 congestion_blocked = true;
1114 }
1115 if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1116 space_id = space_id.next();
1119 continue;
1120 }
1121 if !path_should_send || send_blocked != PathBlocked::No {
1122 if transmit.num_datagrams() > 0 {
1127 break;
1128 }
1129
1130 match self.paths.keys().find(|&&next| next > path_id) {
1131 Some(next_path_id) => {
1132 trace!(
1134 ?space_id,
1135 %path_id,
1136 %next_path_id,
1137 "nothing to send on path"
1138 );
1139 path_id = *next_path_id;
1140 space_id = SpaceId::Data;
1141
1142 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1144 if let Some(challenge) =
1145 self.send_prev_path_challenge(now, &mut transmit, path_id)
1146 {
1147 return Some(challenge);
1148 }
1149
1150 continue;
1151 }
1152 None => {
1153 trace!(
1155 ?space_id,
1156 %path_id,
1157 next_path_id=?None::<PathId>,
1158 "nothing to send on path"
1159 );
1160 break;
1161 }
1162 }
1163 }
1164
1165 if transmit.datagram_remaining_mut() == 0 {
1167 if transmit.num_datagrams() >= transmit.max_datagrams() {
1168 break;
1170 }
1171
1172 match self.spaces[space_id].for_path(path_id).loss_probes {
1173 0 => transmit.start_new_datagram(),
1174 _ => {
1175 let request_immediate_ack =
1177 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1178 self.spaces[space_id].maybe_queue_probe(
1179 path_id,
1180 request_immediate_ack,
1181 &self.streams,
1182 );
1183
1184 self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1185
1186 transmit.start_new_datagram_with_size(std::cmp::min(
1190 usize::from(INITIAL_MTU),
1191 transmit.segment_size(),
1192 ));
1193 }
1194 }
1195 trace!(count = transmit.num_datagrams(), "new datagram started");
1196 coalesce = true;
1197 pad_datagram = PadDatagram::No;
1198 }
1199
1200 if transmit.datagram_start_offset() < transmit.len() {
1203 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1204 }
1205
1206 if self.spaces[SpaceId::Initial].crypto.is_some()
1211 && space_id == SpaceId::Handshake
1212 && self.side.is_client()
1213 {
1214 self.discard_space(now, SpaceId::Initial);
1217 }
1218 if let Some(ref mut prev) = self.prev_crypto {
1219 prev.update_unacked = false;
1220 }
1221
1222 let mut qlog = QlogSentPacket::default();
1223 let mut builder = PacketBuilder::new(
1224 now,
1225 space_id,
1226 path_id,
1227 remote_cid,
1228 &mut transmit,
1229 can_send.other,
1230 self,
1231 &mut qlog,
1232 )?;
1233 last_packet_number = Some(builder.exact_number);
1234 coalesce = coalesce && !builder.short_header;
1235
1236 if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1237 pad_datagram |= PadDatagram::ToMinMtu;
1239 }
1240 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1241 pad_datagram |= PadDatagram::ToSegmentSize;
1242 }
1243
1244 if can_send.close {
1245 trace!("sending CONNECTION_CLOSE");
1246 let mut sent_frames = SentFrames::default();
1251 let is_multipath_negotiated = self.is_multipath_negotiated();
1252 for path_id in self.spaces[space_id]
1253 .number_spaces
1254 .iter()
1255 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1256 .map(|(&path_id, _)| path_id)
1257 .collect::<Vec<_>>()
1258 {
1259 Self::populate_acks(
1260 now,
1261 self.receiving_ecn,
1262 &mut sent_frames,
1263 path_id,
1264 space_id,
1265 &mut self.spaces[space_id],
1266 is_multipath_negotiated,
1267 &mut builder.frame_space_mut(),
1268 &mut self.stats,
1269 &mut qlog,
1270 );
1271 }
1272
1273 debug_assert!(
1277 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1278 "ACKs should leave space for ConnectionClose"
1279 );
1280 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1281 let max_frame_size = builder.frame_space_remaining();
1282 match self.state.as_type() {
1283 StateType::Closed => {
1284 let reason: Close =
1285 self.state.as_closed().expect("checked").clone().into();
1286 if space_id == SpaceId::Data || reason.is_transport_layer() {
1287 reason.encode(&mut builder.frame_space_mut(), max_frame_size);
1288 qlog.frame(&Frame::Close(reason));
1289 } else {
1290 let frame = frame::ConnectionClose {
1291 error_code: TransportErrorCode::APPLICATION_ERROR,
1292 frame_type: None,
1293 reason: Bytes::new(),
1294 };
1295 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1296 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1297 }
1298 }
1299 StateType::Draining => {
1300 let frame = frame::ConnectionClose {
1301 error_code: TransportErrorCode::NO_ERROR,
1302 frame_type: None,
1303 reason: Bytes::new(),
1304 };
1305 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1306 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1307 }
1308 _ => unreachable!(
1309 "tried to make a close packet when the connection wasn't closed"
1310 ),
1311 };
1312 }
1313 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1314 if space_id == self.highest_space {
1315 self.close = false;
1318 break;
1320 } else {
1321 space_id = space_id.next();
1325 continue;
1326 }
1327 }
1328
1329 if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1332 let path = self.path_data_mut(path_id);
1333 if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) {
1334 let response = frame::PathResponse(token);
1338 trace!(%response, "(off-path)");
1339 builder.frame_space_mut().write(response);
1340 qlog.frame(&Frame::PathResponse(response));
1341 self.stats.frame_tx.path_response += 1;
1342 builder.finish_and_track(
1343 now,
1344 self,
1345 path_id,
1346 SentFrames {
1347 non_retransmits: true,
1348 ..SentFrames::default()
1349 },
1350 PadDatagram::ToMinMtu,
1351 qlog,
1352 );
1353 self.stats.udp_tx.on_sent(1, transmit.len());
1354 return Some(Transmit {
1355 destination: remote,
1356 size: transmit.len(),
1357 ecn: None,
1358 segment_size: None,
1359 src_ip: self.local_ip,
1360 });
1361 }
1362 }
1363
1364 let sent_frames = {
1365 let path_exclusive_only = have_available_path
1366 && self.path_data(path_id).local_status() == PathStatus::Backup;
1367 let pn = builder.exact_number;
1368 self.populate_packet(
1369 now,
1370 space_id,
1371 path_id,
1372 path_exclusive_only,
1373 &mut builder.frame_space_mut(),
1374 pn,
1375 &mut qlog,
1376 )
1377 };
1378
1379 debug_assert!(
1386 !(sent_frames.is_ack_only(&self.streams)
1387 && !can_send.acks
1388 && can_send.other
1389 && builder.buf.segment_size()
1390 == self.path_data(path_id).current_mtu() as usize
1391 && self.datagrams.outgoing.is_empty()),
1392 "SendableFrames was {can_send:?}, but only ACKs have been written"
1393 );
1394 if sent_frames.requires_padding {
1395 pad_datagram |= PadDatagram::ToMinMtu;
1396 }
1397
1398 for (path_id, _pn) in sent_frames.largest_acked.iter() {
1399 self.spaces[space_id]
1400 .for_path(*path_id)
1401 .pending_acks
1402 .acks_sent();
1403 self.timers.stop(
1404 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1405 self.qlog.with_time(now),
1406 );
1407 }
1408
1409 if coalesce
1417 && builder
1418 .buf
1419 .datagram_remaining_mut()
1420 .saturating_sub(builder.predict_packet_end())
1421 > MIN_PACKET_SPACE
1422 && self
1423 .next_send_space(space_id, path_id, builder.buf, close)
1424 .is_some()
1425 {
1426 builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog);
1429 } else {
1430 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1433 const MAX_PADDING: usize = 32;
1441 if builder.buf.datagram_remaining_mut()
1442 > builder.predict_packet_end() + MAX_PADDING
1443 {
1444 trace!(
1445 "GSO truncated by demand for {} padding bytes",
1446 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1447 );
1448 builder.finish_and_track(
1449 now,
1450 self,
1451 path_id,
1452 sent_frames,
1453 PadDatagram::No,
1454 qlog,
1455 );
1456 break;
1457 }
1458
1459 builder.finish_and_track(
1462 now,
1463 self,
1464 path_id,
1465 sent_frames,
1466 PadDatagram::ToSegmentSize,
1467 qlog,
1468 );
1469 } else {
1470 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1471 }
1472 if transmit.num_datagrams() == 1 {
1473 transmit.clip_datagram_size();
1474 }
1475 }
1476 }
1477
1478 if let Some(last_packet_number) = last_packet_number {
1479 self.path_data_mut(path_id).congestion.on_sent(
1482 now,
1483 transmit.len() as u64,
1484 last_packet_number,
1485 );
1486 }
1487
1488 self.qlog.emit_recovery_metrics(
1489 path_id,
1490 &mut self.paths.get_mut(&path_id).unwrap().data,
1491 now,
1492 );
1493
1494 self.app_limited = transmit.is_empty() && !congestion_blocked;
1495
1496 if transmit.is_empty() && self.state.is_established() {
1498 let space_id = SpaceId::Data;
1500 path_id = *self.paths.first_key_value().expect("one path must exist").0;
1501 let probe_data = loop {
1502 let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1508 let eligible = self.path_data(path_id).validated
1509 && !self.path_data(path_id).is_validating_path()
1510 && !self.abandoned_paths.contains(&path_id);
1511 let probe_size = eligible
1512 .then(|| {
1513 let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1514 self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1515 })
1516 .flatten();
1517 match (active_cid, probe_size) {
1518 (Some(active_cid), Some(probe_size)) => {
1519 break Some((active_cid, probe_size));
1521 }
1522 _ => {
1523 match self.paths.keys().find(|&&next| next > path_id) {
1525 Some(next) => {
1526 path_id = *next;
1527 continue;
1528 }
1529 None => break None,
1530 }
1531 }
1532 }
1533 };
1534 if let Some((active_cid, probe_size)) = probe_data {
1535 debug_assert_eq!(transmit.num_datagrams(), 0);
1537 transmit.start_new_datagram_with_size(probe_size as usize);
1538
1539 let mut qlog = QlogSentPacket::default();
1540 let mut builder = PacketBuilder::new(
1541 now,
1542 space_id,
1543 path_id,
1544 active_cid,
1545 &mut transmit,
1546 true,
1547 self,
1548 &mut qlog,
1549 )?;
1550
1551 trace!(?probe_size, "writing MTUD probe");
1553 trace!("PING");
1554 builder.frame_space_mut().write(frame::FrameType::PING);
1555 qlog.frame(&Frame::Ping);
1556 self.stats.frame_tx.ping += 1;
1557
1558 if self.peer_supports_ack_frequency() {
1560 trace!("IMMEDIATE_ACK");
1561 builder
1562 .frame_space_mut()
1563 .write(frame::FrameType::IMMEDIATE_ACK);
1564 self.stats.frame_tx.immediate_ack += 1;
1565 qlog.frame(&Frame::ImmediateAck);
1566 }
1567
1568 let sent_frames = SentFrames {
1569 non_retransmits: true,
1570 ..Default::default()
1571 };
1572 builder.finish_and_track(
1573 now,
1574 self,
1575 path_id,
1576 sent_frames,
1577 PadDatagram::ToSize(probe_size),
1578 qlog,
1579 );
1580
1581 self.path_stats
1582 .entry(path_id)
1583 .or_default()
1584 .sent_plpmtud_probes += 1;
1585 }
1586 }
1587
1588 if transmit.is_empty() {
1589 return None;
1590 }
1591
1592 let destination = self.path_data(path_id).remote;
1593 trace!(
1594 segment_size = transmit.segment_size(),
1595 last_datagram_len = transmit.len() % transmit.segment_size(),
1596 ?destination,
1597 "sending {} bytes in {} datagrams",
1598 transmit.len(),
1599 transmit.num_datagrams()
1600 );
1601 self.path_data_mut(path_id)
1602 .inc_total_sent(transmit.len() as u64);
1603
1604 self.stats
1605 .udp_tx
1606 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1607
1608 Some(Transmit {
1609 destination,
1610 size: transmit.len(),
1611 ecn: if self.path_data(path_id).sending_ecn {
1612 Some(EcnCodepoint::Ect0)
1613 } else {
1614 None
1615 },
1616 segment_size: match transmit.num_datagrams() {
1617 1 => None,
1618 _ => Some(transmit.segment_size()),
1619 },
1620 src_ip: self.local_ip,
1621 })
1622 }
1623
1624 fn next_send_space(
1629 &mut self,
1630 current_space_id: SpaceId,
1631 path_id: PathId,
1632 buf: &TransmitBuf<'_>,
1633 close: bool,
1634 ) -> Option<SpaceId> {
1635 let mut space_id = current_space_id;
1642 loop {
1643 let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1644 if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1645 return Some(space_id);
1646 }
1647 space_id = match space_id {
1648 SpaceId::Initial => SpaceId::Handshake,
1649 SpaceId::Handshake => SpaceId::Data,
1650 SpaceId::Data => break,
1651 }
1652 }
1653 None
1654 }
1655
1656 fn path_congestion_check(
1658 &mut self,
1659 space_id: SpaceId,
1660 path_id: PathId,
1661 transmit: &TransmitBuf<'_>,
1662 can_send: &SendableFrames,
1663 now: Instant,
1664 ) -> PathBlocked {
1665 if self.side().is_server()
1671 && self
1672 .path_data(path_id)
1673 .anti_amplification_blocked(transmit.len() as u64 + 1)
1674 {
1675 trace!(?space_id, %path_id, "blocked by anti-amplification");
1676 return PathBlocked::AntiAmplification;
1677 }
1678
1679 let bytes_to_send = transmit.segment_size() as u64;
1682 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1683
1684 if can_send.other && !need_loss_probe && !can_send.close {
1685 let path = self.path_data(path_id);
1686 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1687 trace!(?space_id, %path_id, "blocked by congestion control");
1688 return PathBlocked::Congestion;
1689 }
1690 }
1691
1692 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1694 self.timers.set(
1695 Timer::PerPath(path_id, PathTimer::Pacing),
1696 delay,
1697 self.qlog.with_time(now),
1698 );
1699 trace!(?space_id, %path_id, "blocked by pacing");
1702 return PathBlocked::Pacing;
1703 }
1704
1705 PathBlocked::No
1706 }
1707
1708 fn send_prev_path_challenge(
1713 &mut self,
1714 now: Instant,
1715 buf: &mut TransmitBuf<'_>,
1716 path_id: PathId,
1717 ) -> Option<Transmit> {
1718 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1719 if !prev_path.send_new_challenge {
1722 return None;
1723 };
1724 prev_path.send_new_challenge = false;
1725 let destination = prev_path.remote;
1726 let token = self.rng.random();
1727 let info = paths::SentChallengeInfo {
1728 sent_instant: now,
1729 remote: destination,
1730 };
1731 prev_path.challenges_sent.insert(token, info);
1732 debug_assert_eq!(
1733 self.highest_space,
1734 SpaceId::Data,
1735 "PATH_CHALLENGE queued without 1-RTT keys"
1736 );
1737 buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1738
1739 debug_assert_eq!(buf.datagram_start_offset(), 0);
1745 let mut qlog = QlogSentPacket::default();
1746 let mut builder = PacketBuilder::new(
1747 now,
1748 SpaceId::Data,
1749 path_id,
1750 *prev_cid,
1751 buf,
1752 false,
1753 self,
1754 &mut qlog,
1755 )?;
1756 let challenge = frame::PathChallenge(token);
1757 trace!(%challenge, "validating previous path");
1758 qlog.frame(&Frame::PathChallenge(challenge));
1759 builder.frame_space_mut().write(challenge);
1760 self.stats.frame_tx.path_challenge += 1;
1761
1762 builder.pad_to(MIN_INITIAL_SIZE);
1767
1768 builder.finish(self, now, qlog);
1769 self.stats.udp_tx.on_sent(1, buf.len());
1770
1771 Some(Transmit {
1772 destination,
1773 size: buf.len(),
1774 ecn: None,
1775 segment_size: None,
1776 src_ip: self.local_ip,
1777 })
1778 }
1779
1780 fn space_can_send(
1785 &mut self,
1786 space_id: SpaceId,
1787 path_id: PathId,
1788 packet_size: usize,
1789 close: bool,
1790 ) -> SendableFrames {
1791 let pn = self.spaces[SpaceId::Data]
1792 .for_path(path_id)
1793 .peek_tx_number();
1794 let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1795 if self.spaces[space_id].crypto.is_none()
1796 && (space_id != SpaceId::Data
1797 || self.zero_rtt_crypto.is_none()
1798 || self.side.is_server())
1799 {
1800 return SendableFrames::empty();
1802 }
1803 let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1804 if space_id == SpaceId::Data {
1805 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1806 }
1807
1808 can_send.close = close && self.spaces[space_id].crypto.is_some();
1809
1810 can_send
1811 }
1812
1813 pub fn handle_event(&mut self, event: ConnectionEvent) {
1819 use ConnectionEventInner::*;
1820 match event.0 {
1821 Datagram(DatagramConnectionEvent {
1822 now,
1823 remote,
1824 path_id,
1825 ecn,
1826 first_decode,
1827 remaining,
1828 }) => {
1829 let span = trace_span!("pkt", %path_id);
1830 let _guard = span.enter();
1831 if let Some(known_remote) = self.path(path_id).map(|path| path.remote) {
1835 if remote != known_remote && !self.side.remote_may_migrate(&self.state) {
1836 trace!(
1837 %path_id,
1838 ?remote,
1839 path_remote = ?self.path(path_id).map(|p| p.remote),
1840 "discarding packet from unrecognized peer"
1841 );
1842 return;
1843 }
1844 }
1845
1846 let was_anti_amplification_blocked = self
1847 .path(path_id)
1848 .map(|path| path.anti_amplification_blocked(1))
1849 .unwrap_or(true); self.stats.udp_rx.datagrams += 1;
1853 self.stats.udp_rx.bytes += first_decode.len() as u64;
1854 let data_len = first_decode.len();
1855
1856 self.handle_decode(now, remote, path_id, ecn, first_decode);
1857 if let Some(path) = self.path_mut(path_id) {
1862 path.inc_total_recvd(data_len as u64);
1863 }
1864
1865 if let Some(data) = remaining {
1866 self.stats.udp_rx.bytes += data.len() as u64;
1867 self.handle_coalesced(now, remote, path_id, ecn, data);
1868 }
1869
1870 if let Some(path) = self.paths.get_mut(&path_id) {
1871 self.qlog
1872 .emit_recovery_metrics(path_id, &mut path.data, now);
1873 }
1874
1875 if was_anti_amplification_blocked {
1876 self.set_loss_detection_timer(now, path_id);
1880 }
1881 }
1882 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1883 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1884 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1885 let cid_state = self
1886 .local_cid_state
1887 .entry(path_id)
1888 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1889 cid_state.new_cids(&ids, now);
1890
1891 ids.into_iter().rev().for_each(|frame| {
1892 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1893 });
1894 self.reset_cid_retirement(now);
1896 }
1897 }
1898 }
1899
1900 pub fn handle_timeout(&mut self, now: Instant) {
1910 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1911 trace!(?timer, at=?now, "timeout");
1913 match timer {
1914 Timer::Conn(timer) => match timer {
1915 ConnTimer::Close => {
1916 self.state.move_to_drained(None);
1917 self.endpoint_events.push_back(EndpointEventInner::Drained);
1918 }
1919 ConnTimer::Idle => {
1920 self.kill(ConnectionError::TimedOut);
1921 }
1922 ConnTimer::KeepAlive => {
1923 trace!("sending keep-alive");
1924 self.ping();
1925 }
1926 ConnTimer::KeyDiscard => {
1927 self.zero_rtt_crypto = None;
1928 self.prev_crypto = None;
1929 }
1930 ConnTimer::PushNewCid => {
1931 while let Some((path_id, when)) = self.next_cid_retirement() {
1932 if when > now {
1933 break;
1934 }
1935 match self.local_cid_state.get_mut(&path_id) {
1936 None => error!(%path_id, "No local CID state for path"),
1937 Some(cid_state) => {
1938 let num_new_cid = cid_state.on_cid_timeout().into();
1940 if !self.state.is_closed() {
1941 trace!(
1942 "push a new CID to peer RETIRE_PRIOR_TO field {}",
1943 cid_state.retire_prior_to()
1944 );
1945 self.endpoint_events.push_back(
1946 EndpointEventInner::NeedIdentifiers(
1947 path_id,
1948 now,
1949 num_new_cid,
1950 ),
1951 );
1952 }
1953 }
1954 }
1955 }
1956 }
1957 },
1958 Timer::PerPath(path_id, timer) => {
1960 let span = trace_span!("per-path timer fired", %path_id, ?timer);
1961 let _guard = span.enter();
1962 match timer {
1963 PathTimer::PathIdle => {
1964 self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1965 .ok();
1966 }
1967
1968 PathTimer::PathKeepAlive => {
1969 trace!("sending keep-alive on path");
1970 self.ping_path(path_id).ok();
1971 }
1972 PathTimer::LossDetection => {
1973 self.on_loss_detection_timeout(now, path_id);
1974 self.qlog.emit_recovery_metrics(
1975 path_id,
1976 &mut self.paths.get_mut(&path_id).unwrap().data,
1977 now,
1978 );
1979 }
1980 PathTimer::PathValidation => {
1981 let Some(path) = self.paths.get_mut(&path_id) else {
1982 continue;
1983 };
1984 self.timers.stop(
1985 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1986 self.qlog.with_time(now),
1987 );
1988 debug!("path validation failed");
1989 if let Some((_, prev)) = path.prev.take() {
1990 path.data = prev;
1991 }
1992 path.data.challenges_sent.clear();
1993 path.data.send_new_challenge = false;
1994 }
1995 PathTimer::PathChallengeLost => {
1996 let Some(path) = self.paths.get_mut(&path_id) else {
1997 continue;
1998 };
1999 trace!("path challenge deemed lost");
2000 path.data.send_new_challenge = true;
2001 }
2002 PathTimer::PathOpen => {
2003 let Some(path) = self.path_mut(path_id) else {
2004 continue;
2005 };
2006 path.challenges_sent.clear();
2007 path.send_new_challenge = false;
2008 debug!("new path validation failed");
2009 if let Err(err) = self.close_path(
2010 now,
2011 path_id,
2012 TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2013 ) {
2014 warn!(?err, "failed closing path");
2015 }
2016
2017 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2018 id: path_id,
2019 error: PathError::ValidationFailed,
2020 }));
2021 }
2022 PathTimer::Pacing => trace!("pacing timer expired"),
2023 PathTimer::MaxAckDelay => {
2024 trace!("max ack delay reached");
2025 self.spaces[SpaceId::Data]
2027 .for_path(path_id)
2028 .pending_acks
2029 .on_max_ack_delay_timeout()
2030 }
2031 PathTimer::DiscardPath => {
2032 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2035 if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2036 let (min_seq, max_seq) = loc_cid_state.active_seq();
2037 for seq in min_seq..=max_seq {
2038 self.endpoint_events.push_back(
2039 EndpointEventInner::RetireConnectionId(
2040 now, path_id, seq, false,
2041 ),
2042 );
2043 }
2044 }
2045 self.discard_path(path_id, now);
2046 }
2047 }
2048 }
2049 }
2050 }
2051 }
2052
2053 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2065 self.close_inner(
2066 now,
2067 Close::Application(frame::ApplicationClose { error_code, reason }),
2068 )
2069 }
2070
2071 fn close_inner(&mut self, now: Instant, reason: Close) {
2072 let was_closed = self.state.is_closed();
2073 if !was_closed {
2074 self.close_common();
2075 self.set_close_timer(now);
2076 self.close = true;
2077 self.state.move_to_closed_local(reason);
2078 }
2079 }
2080
2081 pub fn datagrams(&mut self) -> Datagrams<'_> {
2083 Datagrams { conn: self }
2084 }
2085
2086 pub fn stats(&mut self) -> ConnectionStats {
2088 self.stats.clone()
2089 }
2090
2091 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2093 let path = self.paths.get(&path_id)?;
2094 let stats = self.path_stats.entry(path_id).or_default();
2095 stats.rtt = path.data.rtt.get();
2096 stats.cwnd = path.data.congestion.window();
2097 stats.current_mtu = path.data.mtud.current_mtu();
2098 Some(*stats)
2099 }
2100
2101 pub fn ping(&mut self) {
2105 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2108 path_data.ping_pending = true;
2109 }
2110 }
2111
2112 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2116 let path_data = self.spaces[self.highest_space]
2117 .number_spaces
2118 .get_mut(&path)
2119 .ok_or(ClosedPath { _private: () })?;
2120 path_data.ping_pending = true;
2121 Ok(())
2122 }
2123
2124 pub fn force_key_update(&mut self) {
2128 if !self.state.is_established() {
2129 debug!("ignoring forced key update in illegal state");
2130 return;
2131 }
2132 if self.prev_crypto.is_some() {
2133 debug!("ignoring redundant forced key update");
2136 return;
2137 }
2138 self.update_keys(None, false);
2139 }
2140
2141 #[doc(hidden)]
2143 #[deprecated]
2144 pub fn initiate_key_update(&mut self) {
2145 self.force_key_update();
2146 }
2147
2148 pub fn crypto_session(&self) -> &dyn crypto::Session {
2150 &*self.crypto
2151 }
2152
2153 pub fn is_handshaking(&self) -> bool {
2158 self.state.is_handshake()
2159 }
2160
2161 pub fn is_closed(&self) -> bool {
2169 self.state.is_closed()
2170 }
2171
2172 pub fn is_drained(&self) -> bool {
2177 self.state.is_drained()
2178 }
2179
2180 pub fn accepted_0rtt(&self) -> bool {
2184 self.accepted_0rtt
2185 }
2186
2187 pub fn has_0rtt(&self) -> bool {
2189 self.zero_rtt_enabled
2190 }
2191
2192 pub fn has_pending_retransmits(&self) -> bool {
2194 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2195 }
2196
2197 pub fn side(&self) -> Side {
2199 self.side.side()
2200 }
2201
2202 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2204 self.path(path_id)
2205 .map(|path_data| {
2206 path_data
2207 .last_observed_addr_report
2208 .as_ref()
2209 .map(|observed| observed.socket_addr())
2210 })
2211 .ok_or(ClosedPath { _private: () })
2212 }
2213
2214 pub fn local_ip(&self) -> Option<IpAddr> {
2224 self.local_ip
2225 }
2226
2227 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2229 self.path(path_id).map(|d| d.rtt.get())
2230 }
2231
2232 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2234 self.path(path_id).map(|d| d.congestion.as_ref())
2235 }
2236
2237 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2242 self.streams.set_max_concurrent(dir, count);
2243 let pending = &mut self.spaces[SpaceId::Data].pending;
2246 self.streams.queue_max_stream_id(pending);
2247 }
2248
2249 pub fn set_max_concurrent_paths(
2259 &mut self,
2260 now: Instant,
2261 count: NonZeroU32,
2262 ) -> Result<(), MultipathNotNegotiated> {
2263 if !self.is_multipath_negotiated() {
2264 return Err(MultipathNotNegotiated { _private: () });
2265 }
2266 self.max_concurrent_paths = count;
2267
2268 let in_use_count = self
2269 .local_max_path_id
2270 .next()
2271 .saturating_sub(self.abandoned_paths.len() as u32)
2272 .as_u32();
2273 let extra_needed = count.get().saturating_sub(in_use_count);
2274 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2275
2276 self.set_max_path_id(now, new_max_path_id);
2277
2278 Ok(())
2279 }
2280
2281 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2283 if max_path_id <= self.local_max_path_id {
2284 return;
2285 }
2286
2287 self.local_max_path_id = max_path_id;
2288 self.spaces[SpaceId::Data].pending.max_path_id = true;
2289
2290 self.issue_first_path_cids(now);
2291 }
2292
2293 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2299 self.streams.max_concurrent(dir)
2300 }
2301
2302 pub fn set_send_window(&mut self, send_window: u64) {
2304 self.streams.set_send_window(send_window);
2305 }
2306
2307 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2309 if self.streams.set_receive_window(receive_window) {
2310 self.spaces[SpaceId::Data].pending.max_data = true;
2311 }
2312 }
2313
2314 pub fn is_multipath_negotiated(&self) -> bool {
2319 !self.is_handshaking()
2320 && self.config.max_concurrent_multipath_paths.is_some()
2321 && self.peer_params.initial_max_path_id.is_some()
2322 }
2323
2324 fn on_ack_received(
2325 &mut self,
2326 now: Instant,
2327 space: SpaceId,
2328 ack: frame::Ack,
2329 ) -> Result<(), TransportError> {
2330 let path = PathId::ZERO;
2332 self.inner_on_ack_received(now, space, path, ack)
2333 }
2334
2335 fn on_path_ack_received(
2336 &mut self,
2337 now: Instant,
2338 space: SpaceId,
2339 path_ack: frame::PathAck,
2340 ) -> Result<(), TransportError> {
2341 let (ack, path) = path_ack.into_ack();
2342 self.inner_on_ack_received(now, space, path, ack)
2343 }
2344
2345 fn inner_on_ack_received(
2347 &mut self,
2348 now: Instant,
2349 space: SpaceId,
2350 path: PathId,
2351 ack: frame::Ack,
2352 ) -> Result<(), TransportError> {
2353 if self.abandoned_paths.contains(&path) {
2354 trace!("silently ignoring PATH_ACK on abandoned path");
2357 return Ok(());
2358 }
2359 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2360 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2361 }
2362 let new_largest = {
2363 let space = &mut self.spaces[space].for_path(path);
2364 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2365 space.largest_acked_packet = Some(ack.largest);
2366 if let Some(info) = space.sent_packets.get(ack.largest) {
2367 space.largest_acked_packet_sent = info.time_sent;
2371 }
2372 true
2373 } else {
2374 false
2375 }
2376 };
2377
2378 if self.detect_spurious_loss(&ack, space, path) {
2379 self.path_data_mut(path)
2380 .congestion
2381 .on_spurious_congestion_event();
2382 }
2383
2384 let mut newly_acked = ArrayRangeSet::new();
2386 for range in ack.iter() {
2387 self.spaces[space].for_path(path).check_ack(range.clone())?;
2388 for (pn, _) in self.spaces[space]
2389 .for_path(path)
2390 .sent_packets
2391 .iter_range(range)
2392 {
2393 newly_acked.insert_one(pn);
2394 }
2395 }
2396
2397 if newly_acked.is_empty() {
2398 return Ok(());
2399 }
2400
2401 let mut ack_eliciting_acked = false;
2402 for packet in newly_acked.elts() {
2403 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2404 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2405 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2411 pns.pending_acks.subtract_below(*acked_pn);
2412 }
2413 }
2414 ack_eliciting_acked |= info.ack_eliciting;
2415
2416 let path_data = self.path_data_mut(path);
2418 let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2419 if mtu_updated {
2420 path_data
2421 .congestion
2422 .on_mtu_update(path_data.mtud.current_mtu());
2423 }
2424
2425 self.ack_frequency.on_acked(path, packet);
2427
2428 self.on_packet_acked(now, path, info);
2429 }
2430 }
2431
2432 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2433 let app_limited = self.app_limited;
2434 let path_data = self.path_data_mut(path);
2435 let in_flight = path_data.in_flight.bytes;
2436
2437 path_data
2438 .congestion
2439 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2440
2441 if new_largest && ack_eliciting_acked {
2442 let ack_delay = if space != SpaceId::Data {
2443 Duration::from_micros(0)
2444 } else {
2445 cmp::min(
2446 self.ack_frequency.peer_max_ack_delay,
2447 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2448 )
2449 };
2450 let rtt = now.saturating_duration_since(
2451 self.spaces[space].for_path(path).largest_acked_packet_sent,
2452 );
2453
2454 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2455 let path_data = self.path_data_mut(path);
2456 path_data.rtt.update(ack_delay, rtt);
2458 if path_data.first_packet_after_rtt_sample.is_none() {
2459 path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2460 }
2461 }
2462
2463 self.detect_lost_packets(now, space, path, true);
2465
2466 if self.peer_completed_address_validation(path) {
2467 self.path_data_mut(path).pto_count = 0;
2468 }
2469
2470 if self.path_data(path).sending_ecn {
2475 if let Some(ecn) = ack.ecn {
2476 if new_largest {
2481 let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2482 self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2483 }
2484 } else {
2485 debug!("ECN not acknowledged by peer");
2487 self.path_data_mut(path).sending_ecn = false;
2488 }
2489 }
2490
2491 self.set_loss_detection_timer(now, path);
2492 Ok(())
2493 }
2494
2495 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2496 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2497
2498 if lost_packets.is_empty() {
2499 return false;
2500 }
2501
2502 for range in ack.iter() {
2503 let spurious_losses: Vec<u64> = lost_packets
2504 .iter_range(range.clone())
2505 .map(|(pn, _info)| pn)
2506 .collect();
2507
2508 for pn in spurious_losses {
2509 lost_packets.remove(pn);
2510 }
2511 }
2512
2513 lost_packets.is_empty()
2518 }
2519
2520 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2525 let two_pto = 2 * self.path_data(path).rtt.pto_base();
2526
2527 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2528 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2529 }
2530
2531 fn process_ecn(
2533 &mut self,
2534 now: Instant,
2535 space: SpaceId,
2536 path: PathId,
2537 newly_acked: u64,
2538 ecn: frame::EcnCounts,
2539 largest_sent_time: Instant,
2540 ) {
2541 match self.spaces[space]
2542 .for_path(path)
2543 .detect_ecn(newly_acked, ecn)
2544 {
2545 Err(e) => {
2546 debug!("halting ECN due to verification failure: {}", e);
2547
2548 self.path_data_mut(path).sending_ecn = false;
2549 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2552 }
2553 Ok(false) => {}
2554 Ok(true) => {
2555 self.path_stats.entry(path).or_default().congestion_events += 1;
2556 self.path_data_mut(path).congestion.on_congestion_event(
2557 now,
2558 largest_sent_time,
2559 false,
2560 true,
2561 0,
2562 );
2563 }
2564 }
2565 }
2566
2567 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2570 self.paths
2571 .get_mut(&path_id)
2572 .expect("known path")
2573 .remove_in_flight(&info);
2574 let app_limited = self.app_limited;
2575 let path = self.path_data_mut(path_id);
2576 if info.ack_eliciting && !path.is_validating_path() {
2577 let rtt = path.rtt;
2580 path.congestion
2581 .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2582 }
2583
2584 if let Some(retransmits) = info.retransmits.get() {
2586 for (id, _) in retransmits.reset_stream.iter() {
2587 self.streams.reset_acked(*id);
2588 }
2589 }
2590
2591 for frame in info.stream_frames {
2592 self.streams.received_ack_of(frame);
2593 }
2594 }
2595
2596 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2597 let start = if self.zero_rtt_crypto.is_some() {
2598 now
2599 } else {
2600 self.prev_crypto
2601 .as_ref()
2602 .expect("no previous keys")
2603 .end_packet
2604 .as_ref()
2605 .expect("update not acknowledged yet")
2606 .1
2607 };
2608
2609 self.timers.set(
2611 Timer::Conn(ConnTimer::KeyDiscard),
2612 start + self.pto_max_path(space, false) * 3,
2613 self.qlog.with_time(now),
2614 );
2615 }
2616
2617 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2630 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2631 self.detect_lost_packets(now, pn_space, path_id, false);
2633 self.set_loss_detection_timer(now, path_id);
2634 return;
2635 }
2636
2637 let (_, space) = match self.pto_time_and_space(now, path_id) {
2638 Some(x) => x,
2639 None => {
2640 error!(%path_id, "PTO expired while unset");
2641 return;
2642 }
2643 };
2644 trace!(
2645 in_flight = self.path_data(path_id).in_flight.bytes,
2646 count = self.path_data(path_id).pto_count,
2647 ?space,
2648 %path_id,
2649 "PTO fired"
2650 );
2651
2652 let count = match self.path_data(path_id).in_flight.ack_eliciting {
2653 0 => {
2656 debug_assert!(!self.peer_completed_address_validation(path_id));
2657 1
2658 }
2659 _ => 2,
2661 };
2662 let pns = self.spaces[space].for_path(path_id);
2663 pns.loss_probes = pns.loss_probes.saturating_add(count);
2664 let path_data = self.path_data_mut(path_id);
2665 path_data.pto_count = path_data.pto_count.saturating_add(1);
2666 self.set_loss_detection_timer(now, path_id);
2667 }
2668
2669 fn detect_lost_packets(
2686 &mut self,
2687 now: Instant,
2688 pn_space: SpaceId,
2689 path_id: PathId,
2690 due_to_ack: bool,
2691 ) {
2692 let mut lost_packets = Vec::<u64>::new();
2693 let mut lost_mtu_probe = None;
2694 let mut in_persistent_congestion = false;
2695 let mut size_of_lost_packets = 0u64;
2696 self.spaces[pn_space].for_path(path_id).loss_time = None;
2697
2698 let path = self.path_data(path_id);
2701 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2702 let loss_delay = path
2703 .rtt
2704 .conservative()
2705 .mul_f32(self.config.time_threshold)
2706 .max(TIMER_GRANULARITY);
2707 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2708
2709 let largest_acked_packet = self.spaces[pn_space]
2710 .for_path(path_id)
2711 .largest_acked_packet
2712 .expect("detect_lost_packets only to be called if path received at least one ACK");
2713 let packet_threshold = self.config.packet_threshold as u64;
2714
2715 let congestion_period = self
2719 .pto(SpaceId::Data, path_id)
2720 .saturating_mul(self.config.persistent_congestion_threshold);
2721 let mut persistent_congestion_start: Option<Instant> = None;
2722 let mut prev_packet = None;
2723 let space = self.spaces[pn_space].for_path(path_id);
2724
2725 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2726 if prev_packet != Some(packet.wrapping_sub(1)) {
2727 persistent_congestion_start = None;
2729 }
2730
2731 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2735 if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2736 if Some(packet) == in_flight_mtu_probe {
2738 lost_mtu_probe = in_flight_mtu_probe;
2741 } else {
2742 lost_packets.push(packet);
2743 size_of_lost_packets += info.size as u64;
2744 if info.ack_eliciting && due_to_ack {
2745 match persistent_congestion_start {
2746 Some(start) if info.time_sent - start > congestion_period => {
2749 in_persistent_congestion = true;
2750 }
2751 None if first_packet_after_rtt_sample
2753 .is_some_and(|x| x < (pn_space, packet)) =>
2754 {
2755 persistent_congestion_start = Some(info.time_sent);
2756 }
2757 _ => {}
2758 }
2759 }
2760 }
2761 } else {
2762 if space.loss_time.is_none() {
2764 space.loss_time = Some(info.time_sent + loss_delay);
2767 }
2768 persistent_congestion_start = None;
2769 }
2770
2771 prev_packet = Some(packet);
2772 }
2773
2774 self.handle_lost_packets(
2775 pn_space,
2776 path_id,
2777 now,
2778 lost_packets,
2779 lost_mtu_probe,
2780 loss_delay,
2781 in_persistent_congestion,
2782 size_of_lost_packets,
2783 );
2784 }
2785
2786 fn discard_path(&mut self, path_id: PathId, now: Instant) {
2788 trace!(%path_id, "dropping path state");
2789 let path = self.path_data(path_id);
2790 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2791
2792 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2794 .for_path(path_id)
2795 .sent_packets
2796 .iter()
2797 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2798 .map(|(pn, info)| {
2799 size_of_lost_packets += info.size as u64;
2800 pn
2801 })
2802 .collect();
2803
2804 if !lost_pns.is_empty() {
2805 trace!(
2806 %path_id,
2807 count = lost_pns.len(),
2808 lost_bytes = size_of_lost_packets,
2809 "packets lost on path abandon"
2810 );
2811 self.handle_lost_packets(
2812 SpaceId::Data,
2813 path_id,
2814 now,
2815 lost_pns,
2816 in_flight_mtu_probe,
2817 Duration::ZERO,
2818 false,
2819 size_of_lost_packets,
2820 );
2821 }
2822 self.paths.remove(&path_id);
2823 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2824
2825 let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2826 self.events.push_back(
2827 PathEvent::Abandoned {
2828 id: path_id,
2829 path_stats,
2830 }
2831 .into(),
2832 );
2833 }
2834
2835 fn handle_lost_packets(
2836 &mut self,
2837 pn_space: SpaceId,
2838 path_id: PathId,
2839 now: Instant,
2840 lost_packets: Vec<u64>,
2841 lost_mtu_probe: Option<u64>,
2842 loss_delay: Duration,
2843 in_persistent_congestion: bool,
2844 size_of_lost_packets: u64,
2845 ) {
2846 debug_assert!(
2847 {
2848 let mut sorted = lost_packets.clone();
2849 sorted.sort();
2850 sorted == lost_packets
2851 },
2852 "lost_packets must be sorted"
2853 );
2854
2855 self.drain_lost_packets(now, pn_space, path_id);
2856
2857 if let Some(largest_lost) = lost_packets.last().cloned() {
2859 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2860 let largest_lost_sent = self.spaces[pn_space]
2861 .for_path(path_id)
2862 .sent_packets
2863 .get(largest_lost)
2864 .unwrap()
2865 .time_sent;
2866 let path_stats = self.path_stats.entry(path_id).or_default();
2867 path_stats.lost_packets += lost_packets.len() as u64;
2868 path_stats.lost_bytes += size_of_lost_packets;
2869 trace!(
2870 %path_id,
2871 count = lost_packets.len(),
2872 lost_bytes = size_of_lost_packets,
2873 "packets lost",
2874 );
2875
2876 for &packet in &lost_packets {
2877 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2878 continue;
2879 };
2880 self.qlog
2881 .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2882 self.paths
2883 .get_mut(&path_id)
2884 .unwrap()
2885 .remove_in_flight(&info);
2886
2887 for frame in info.stream_frames {
2888 self.streams.retransmit(frame);
2889 }
2890 self.spaces[pn_space].pending |= info.retransmits;
2891 self.path_data_mut(path_id)
2892 .mtud
2893 .on_non_probe_lost(packet, info.size);
2894
2895 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2896 packet,
2897 LostPacket {
2898 time_sent: info.time_sent,
2899 },
2900 );
2901 }
2902
2903 let path = self.path_data_mut(path_id);
2904 if path.mtud.black_hole_detected(now) {
2905 path.congestion.on_mtu_update(path.mtud.current_mtu());
2906 if let Some(max_datagram_size) = self.datagrams().max_size() {
2907 self.datagrams.drop_oversized(max_datagram_size);
2908 }
2909 self.path_stats
2910 .entry(path_id)
2911 .or_default()
2912 .black_holes_detected += 1;
2913 }
2914
2915 let lost_ack_eliciting =
2917 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2918
2919 if lost_ack_eliciting {
2920 self.path_stats
2921 .entry(path_id)
2922 .or_default()
2923 .congestion_events += 1;
2924 self.path_data_mut(path_id).congestion.on_congestion_event(
2925 now,
2926 largest_lost_sent,
2927 in_persistent_congestion,
2928 false,
2929 size_of_lost_packets,
2930 );
2931 }
2932 }
2933
2934 if let Some(packet) = lost_mtu_probe {
2936 let info = self.spaces[SpaceId::Data]
2937 .for_path(path_id)
2938 .take(packet)
2939 .unwrap(); self.paths
2942 .get_mut(&path_id)
2943 .unwrap()
2944 .remove_in_flight(&info);
2945 self.path_data_mut(path_id).mtud.on_probe_lost();
2946 self.path_stats
2947 .entry(path_id)
2948 .or_default()
2949 .lost_plpmtud_probes += 1;
2950 }
2951 }
2952
2953 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2959 SpaceId::iter()
2960 .filter_map(|id| {
2961 self.spaces[id]
2962 .number_spaces
2963 .get(&path_id)
2964 .and_then(|pns| pns.loss_time)
2965 .map(|time| (time, id))
2966 })
2967 .min_by_key(|&(time, _)| time)
2968 }
2969
2970 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2972 let path = self.path(path_id)?;
2973 let pto_count = path.pto_count;
2974 let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2975 let mut duration = path.rtt.pto_base() * backoff;
2976
2977 if path_id == PathId::ZERO
2978 && path.in_flight.ack_eliciting == 0
2979 && !self.peer_completed_address_validation(PathId::ZERO)
2980 {
2981 let space = match self.highest_space {
2987 SpaceId::Handshake => SpaceId::Handshake,
2988 _ => SpaceId::Initial,
2989 };
2990
2991 return Some((now + duration, space));
2992 }
2993
2994 let mut result = None;
2995 for space in SpaceId::iter() {
2996 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2997 continue;
2998 };
2999
3000 if !pns.has_in_flight() {
3001 continue;
3002 }
3003 if space == SpaceId::Data {
3004 if self.is_handshaking() {
3006 return result;
3007 }
3008 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
3010 }
3011 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3012 continue;
3013 };
3014 let pto = last_ack_eliciting + duration;
3015 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3016 if path.anti_amplification_blocked(1) {
3017 continue;
3019 }
3020 if path.in_flight.ack_eliciting == 0 {
3021 continue;
3023 }
3024 result = Some((pto, space));
3025 }
3026 }
3027 result
3028 }
3029
3030 fn peer_completed_address_validation(&self, path: PathId) -> bool {
3031 if self.side.is_server() || self.state.is_closed() {
3033 return true;
3034 }
3035 self.spaces[SpaceId::Handshake]
3038 .path_space(PathId::ZERO)
3039 .and_then(|pns| pns.largest_acked_packet)
3040 .is_some()
3041 || self.spaces[SpaceId::Data]
3042 .path_space(path)
3043 .and_then(|pns| pns.largest_acked_packet)
3044 .is_some()
3045 || (self.spaces[SpaceId::Data].crypto.is_some()
3046 && self.spaces[SpaceId::Handshake].crypto.is_none())
3047 }
3048
3049 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3057 if self.state.is_closed() {
3058 return;
3062 }
3063
3064 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3065 self.timers.set(
3067 Timer::PerPath(path_id, PathTimer::LossDetection),
3068 loss_time,
3069 self.qlog.with_time(now),
3070 );
3071 return;
3072 }
3073
3074 if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3077 self.timers.set(
3078 Timer::PerPath(path_id, PathTimer::LossDetection),
3079 timeout,
3080 self.qlog.with_time(now),
3081 );
3082 } else {
3083 self.timers.stop(
3084 Timer::PerPath(path_id, PathTimer::LossDetection),
3085 self.qlog.with_time(now),
3086 );
3087 }
3088 }
3089
3090 fn pto_max_path(&self, space: SpaceId, is_closing: bool) -> Duration {
3096 match space {
3097 SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3098 SpaceId::Data => self
3099 .paths
3100 .iter()
3101 .filter_map(|(path_id, state)| {
3102 if is_closing && state.data.total_sent == 0 && state.data.total_recvd == 0 {
3103 None
3105 } else {
3106 let pto = self.pto(space, *path_id);
3107 Some(pto)
3108 }
3109 })
3110 .max()
3111 .expect("there should be one at least path"),
3112 }
3113 }
3114
3115 fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3120 let max_ack_delay = match space {
3121 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3122 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3123 };
3124 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3125 }
3126
3127 fn on_packet_authenticated(
3128 &mut self,
3129 now: Instant,
3130 space_id: SpaceId,
3131 path_id: PathId,
3132 ecn: Option<EcnCodepoint>,
3133 packet: Option<u64>,
3134 spin: bool,
3135 is_1rtt: bool,
3136 ) {
3137 self.total_authed_packets += 1;
3138 if let Some(last_allowed_receive) = self
3139 .paths
3140 .get(&path_id)
3141 .and_then(|path| path.data.last_allowed_receive)
3142 {
3143 if now > last_allowed_receive {
3144 warn!("received data on path which we abandoned more than 3 * PTO ago");
3145 if !self.state.is_closed() {
3147 self.state.move_to_closed(TransportError::NO_ERROR(
3149 "peer failed to respond with PATH_ABANDON in time",
3150 ));
3151 self.close_common();
3152 self.set_close_timer(now);
3153 self.close = true;
3154 }
3155 return;
3156 }
3157 }
3158
3159 self.reset_keep_alive(path_id, now);
3160 self.reset_idle_timeout(now, space_id, path_id);
3161 self.permit_idle_reset = true;
3162 self.receiving_ecn |= ecn.is_some();
3163 if let Some(x) = ecn {
3164 let space = &mut self.spaces[space_id];
3165 space.for_path(path_id).ecn_counters += x;
3166
3167 if x.is_ce() {
3168 space
3169 .for_path(path_id)
3170 .pending_acks
3171 .set_immediate_ack_required();
3172 }
3173 }
3174
3175 let packet = match packet {
3176 Some(x) => x,
3177 None => return,
3178 };
3179 match &self.side {
3180 ConnectionSide::Client { .. } => {
3181 if space_id == SpaceId::Handshake {
3185 if let Some(hs) = self.state.as_handshake_mut() {
3186 hs.allow_server_migration = false;
3187 }
3188 }
3189 }
3190 ConnectionSide::Server { .. } => {
3191 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3192 {
3193 self.discard_space(now, SpaceId::Initial);
3195 }
3196 if self.zero_rtt_crypto.is_some() && is_1rtt {
3197 self.set_key_discard_timer(now, space_id)
3199 }
3200 }
3201 }
3202 let space = self.spaces[space_id].for_path(path_id);
3203 space.pending_acks.insert_one(packet, now);
3204 if packet >= space.rx_packet.unwrap_or_default() {
3205 space.rx_packet = Some(packet);
3206 self.spin = self.side.is_client() ^ spin;
3208 }
3209 }
3210
3211 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3216 if let Some(timeout) = self.idle_timeout {
3218 if self.state.is_closed() {
3219 self.timers
3220 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3221 } else {
3222 let dt = cmp::max(timeout, 3 * self.pto_max_path(space, false));
3223 self.timers.set(
3224 Timer::Conn(ConnTimer::Idle),
3225 now + dt,
3226 self.qlog.with_time(now),
3227 );
3228 }
3229 }
3230
3231 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3233 if self.state.is_closed() {
3234 self.timers.stop(
3235 Timer::PerPath(path_id, PathTimer::PathIdle),
3236 self.qlog.with_time(now),
3237 );
3238 } else {
3239 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3240 self.timers.set(
3241 Timer::PerPath(path_id, PathTimer::PathIdle),
3242 now + dt,
3243 self.qlog.with_time(now),
3244 );
3245 }
3246 }
3247 }
3248
3249 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3251 if !self.state.is_established() {
3252 return;
3253 }
3254
3255 if let Some(interval) = self.config.keep_alive_interval {
3256 self.timers.set(
3257 Timer::Conn(ConnTimer::KeepAlive),
3258 now + interval,
3259 self.qlog.with_time(now),
3260 );
3261 }
3262
3263 if let Some(interval) = self.path_data(path_id).keep_alive {
3264 self.timers.set(
3265 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3266 now + interval,
3267 self.qlog.with_time(now),
3268 );
3269 }
3270 }
3271
3272 fn reset_cid_retirement(&mut self, now: Instant) {
3274 if let Some((_path, t)) = self.next_cid_retirement() {
3275 self.timers.set(
3276 Timer::Conn(ConnTimer::PushNewCid),
3277 t,
3278 self.qlog.with_time(now),
3279 );
3280 }
3281 }
3282
3283 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3285 self.local_cid_state
3286 .iter()
3287 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3288 .min_by_key(|(_path_id, timeout)| *timeout)
3289 }
3290
3291 pub(crate) fn handle_first_packet(
3296 &mut self,
3297 now: Instant,
3298 remote: SocketAddr,
3299 ecn: Option<EcnCodepoint>,
3300 packet_number: u64,
3301 packet: InitialPacket,
3302 remaining: Option<BytesMut>,
3303 ) -> Result<(), ConnectionError> {
3304 let span = trace_span!("first recv");
3305 let _guard = span.enter();
3306 debug_assert!(self.side.is_server());
3307 let len = packet.header_data.len() + packet.payload.len();
3308 let path_id = PathId::ZERO;
3309 self.path_data_mut(path_id).total_recvd = len as u64;
3310
3311 if let Some(hs) = self.state.as_handshake_mut() {
3312 hs.expected_token = packet.header.token.clone();
3313 } else {
3314 unreachable!("first packet must be delivered in Handshake state");
3315 }
3316
3317 self.on_packet_authenticated(
3319 now,
3320 SpaceId::Initial,
3321 path_id,
3322 ecn,
3323 Some(packet_number),
3324 false,
3325 false,
3326 );
3327
3328 let packet: Packet = packet.into();
3329
3330 let mut qlog = QlogRecvPacket::new(len);
3331 qlog.header(&packet.header, Some(packet_number), path_id);
3332
3333 self.process_decrypted_packet(
3334 now,
3335 remote,
3336 path_id,
3337 Some(packet_number),
3338 packet,
3339 &mut qlog,
3340 )?;
3341 self.qlog.emit_packet_received(qlog, now);
3342 if let Some(data) = remaining {
3343 self.handle_coalesced(now, remote, path_id, ecn, data);
3344 }
3345
3346 self.qlog.emit_recovery_metrics(
3347 path_id,
3348 &mut self.paths.get_mut(&path_id).unwrap().data,
3349 now,
3350 );
3351
3352 Ok(())
3353 }
3354
3355 fn init_0rtt(&mut self, now: Instant) {
3356 let (header, packet) = match self.crypto.early_crypto() {
3357 Some(x) => x,
3358 None => return,
3359 };
3360 if self.side.is_client() {
3361 match self.crypto.transport_parameters() {
3362 Ok(params) => {
3363 let params = params
3364 .expect("crypto layer didn't supply transport parameters with ticket");
3365 let params = TransportParameters {
3367 initial_src_cid: None,
3368 original_dst_cid: None,
3369 preferred_address: None,
3370 retry_src_cid: None,
3371 stateless_reset_token: None,
3372 min_ack_delay: None,
3373 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3374 max_ack_delay: TransportParameters::default().max_ack_delay,
3375 initial_max_path_id: None,
3376 ..params
3377 };
3378 self.set_peer_params(params);
3379 self.qlog.emit_peer_transport_params_restored(self, now);
3380 }
3381 Err(e) => {
3382 error!("session ticket has malformed transport parameters: {}", e);
3383 return;
3384 }
3385 }
3386 }
3387 trace!("0-RTT enabled");
3388 self.zero_rtt_enabled = true;
3389 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3390 }
3391
3392 fn read_crypto(
3393 &mut self,
3394 space: SpaceId,
3395 crypto: &frame::Crypto,
3396 payload_len: usize,
3397 ) -> Result<(), TransportError> {
3398 let expected = if !self.state.is_handshake() {
3399 SpaceId::Data
3400 } else if self.highest_space == SpaceId::Initial {
3401 SpaceId::Initial
3402 } else {
3403 SpaceId::Handshake
3406 };
3407 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3411
3412 let end = crypto.offset + crypto.data.len() as u64;
3413 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3414 warn!(
3415 "received new {:?} CRYPTO data when expecting {:?}",
3416 space, expected
3417 );
3418 return Err(TransportError::PROTOCOL_VIOLATION(
3419 "new data at unexpected encryption level",
3420 ));
3421 }
3422
3423 let space = &mut self.spaces[space];
3424 let max = end.saturating_sub(space.crypto_stream.bytes_read());
3425 if max > self.config.crypto_buffer_size as u64 {
3426 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3427 }
3428
3429 space
3430 .crypto_stream
3431 .insert(crypto.offset, crypto.data.clone(), payload_len);
3432 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3433 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3434 if self.crypto.read_handshake(&chunk.bytes)? {
3435 self.events.push_back(Event::HandshakeDataReady);
3436 }
3437 }
3438
3439 Ok(())
3440 }
3441
3442 fn write_crypto(&mut self) {
3443 loop {
3444 let space = self.highest_space;
3445 let mut outgoing = Vec::new();
3446 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3447 match space {
3448 SpaceId::Initial => {
3449 self.upgrade_crypto(SpaceId::Handshake, crypto);
3450 }
3451 SpaceId::Handshake => {
3452 self.upgrade_crypto(SpaceId::Data, crypto);
3453 }
3454 _ => unreachable!("got updated secrets during 1-RTT"),
3455 }
3456 }
3457 if outgoing.is_empty() {
3458 if space == self.highest_space {
3459 break;
3460 } else {
3461 continue;
3463 }
3464 }
3465 let offset = self.spaces[space].crypto_offset;
3466 let outgoing = Bytes::from(outgoing);
3467 if let Some(hs) = self.state.as_handshake_mut() {
3468 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3469 hs.client_hello = Some(outgoing.clone());
3470 }
3471 }
3472 self.spaces[space].crypto_offset += outgoing.len() as u64;
3473 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3474 self.spaces[space].pending.crypto.push_back(frame::Crypto {
3475 offset,
3476 data: outgoing,
3477 });
3478 }
3479 }
3480
3481 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3483 debug_assert!(
3484 self.spaces[space].crypto.is_none(),
3485 "already reached packet space {space:?}"
3486 );
3487 trace!("{:?} keys ready", space);
3488 if space == SpaceId::Data {
3489 self.next_crypto = Some(
3491 self.crypto
3492 .next_1rtt_keys()
3493 .expect("handshake should be complete"),
3494 );
3495 }
3496
3497 self.spaces[space].crypto = Some(crypto);
3498 debug_assert!(space as usize > self.highest_space as usize);
3499 self.highest_space = space;
3500 if space == SpaceId::Data && self.side.is_client() {
3501 self.zero_rtt_crypto = None;
3503 }
3504 }
3505
3506 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3507 debug_assert!(space_id != SpaceId::Data);
3508 trace!("discarding {:?} keys", space_id);
3509 if space_id == SpaceId::Initial {
3510 if let ConnectionSide::Client { token, .. } = &mut self.side {
3512 *token = Bytes::new();
3513 }
3514 }
3515 let space = &mut self.spaces[space_id];
3516 space.crypto = None;
3517 let pns = space.for_path(PathId::ZERO);
3518 pns.time_of_last_ack_eliciting_packet = None;
3519 pns.loss_time = None;
3520 pns.loss_probes = 0;
3521 let sent_packets = mem::take(&mut pns.sent_packets);
3522 let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3523 for (_, packet) in sent_packets.into_iter() {
3524 path.data.remove_in_flight(&packet);
3525 }
3526
3527 self.set_loss_detection_timer(now, PathId::ZERO)
3528 }
3529
3530 fn handle_coalesced(
3531 &mut self,
3532 now: Instant,
3533 remote: SocketAddr,
3534 path_id: PathId,
3535 ecn: Option<EcnCodepoint>,
3536 data: BytesMut,
3537 ) {
3538 self.path_data_mut(path_id)
3539 .inc_total_recvd(data.len() as u64);
3540 let mut remaining = Some(data);
3541 let cid_len = self
3542 .local_cid_state
3543 .values()
3544 .map(|cid_state| cid_state.cid_len())
3545 .next()
3546 .expect("one cid_state must exist");
3547 while let Some(data) = remaining {
3548 match PartialDecode::new(
3549 data,
3550 &FixedLengthConnectionIdParser::new(cid_len),
3551 &[self.version],
3552 self.endpoint_config.grease_quic_bit,
3553 ) {
3554 Ok((partial_decode, rest)) => {
3555 remaining = rest;
3556 self.handle_decode(now, remote, path_id, ecn, partial_decode);
3557 }
3558 Err(e) => {
3559 trace!("malformed header: {}", e);
3560 return;
3561 }
3562 }
3563 }
3564 }
3565
3566 fn handle_decode(
3567 &mut self,
3568 now: Instant,
3569 remote: SocketAddr,
3570 path_id: PathId,
3571 ecn: Option<EcnCodepoint>,
3572 partial_decode: PartialDecode,
3573 ) {
3574 let qlog = QlogRecvPacket::new(partial_decode.len());
3575 if let Some(decoded) = packet_crypto::unprotect_header(
3576 partial_decode,
3577 &self.spaces,
3578 self.zero_rtt_crypto.as_ref(),
3579 self.peer_params.stateless_reset_token,
3580 ) {
3581 self.handle_packet(
3582 now,
3583 remote,
3584 path_id,
3585 ecn,
3586 decoded.packet,
3587 decoded.stateless_reset,
3588 qlog,
3589 );
3590 }
3591 }
3592
3593 fn handle_packet(
3594 &mut self,
3595 now: Instant,
3596 remote: SocketAddr,
3597 path_id: PathId,
3598 ecn: Option<EcnCodepoint>,
3599 packet: Option<Packet>,
3600 stateless_reset: bool,
3601 mut qlog: QlogRecvPacket,
3602 ) {
3603 self.stats.udp_rx.ios += 1;
3604 if let Some(ref packet) = packet {
3605 trace!(
3606 "got {:?} packet ({} bytes) from {} using id {}",
3607 packet.header.space(),
3608 packet.payload.len() + packet.header_data.len(),
3609 remote,
3610 packet.header.dst_cid(),
3611 );
3612 }
3613
3614 if self.is_handshaking() {
3615 if path_id != PathId::ZERO {
3616 debug!(%remote, %path_id, "discarding multipath packet during handshake");
3617 return;
3618 }
3619 if remote != self.path_data_mut(path_id).remote {
3620 if let Some(hs) = self.state.as_handshake() {
3621 if hs.allow_server_migration {
3622 trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote");
3623 self.path_data_mut(path_id).remote = remote;
3624 self.qlog.emit_tuple_assigned(path_id, remote, now);
3625 } else {
3626 debug!("discarding packet with unexpected remote during handshake");
3627 return;
3628 }
3629 } else {
3630 debug!("discarding packet with unexpected remote during handshake");
3631 return;
3632 }
3633 }
3634 }
3635
3636 let was_closed = self.state.is_closed();
3637 let was_drained = self.state.is_drained();
3638
3639 let decrypted = match packet {
3640 None => Err(None),
3641 Some(mut packet) => self
3642 .decrypt_packet(now, path_id, &mut packet)
3643 .map(move |number| (packet, number)),
3644 };
3645 let result = match decrypted {
3646 _ if stateless_reset => {
3647 debug!("got stateless reset");
3648 Err(ConnectionError::Reset)
3649 }
3650 Err(Some(e)) => {
3651 warn!("illegal packet: {}", e);
3652 Err(e.into())
3653 }
3654 Err(None) => {
3655 debug!("failed to authenticate packet");
3656 self.authentication_failures += 1;
3657 let integrity_limit = self.spaces[self.highest_space]
3658 .crypto
3659 .as_ref()
3660 .unwrap()
3661 .packet
3662 .local
3663 .integrity_limit();
3664 if self.authentication_failures > integrity_limit {
3665 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3666 } else {
3667 return;
3668 }
3669 }
3670 Ok((packet, number)) => {
3671 qlog.header(&packet.header, number, path_id);
3672 let span = match number {
3673 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3674 None => trace_span!("recv", space = ?packet.header.space()),
3675 };
3676 let _guard = span.enter();
3677
3678 let dedup = self.spaces[packet.header.space()]
3679 .path_space_mut(path_id)
3680 .map(|pns| &mut pns.dedup);
3681 if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3682 debug!("discarding possible duplicate packet");
3683 self.qlog.emit_packet_received(qlog, now);
3684 return;
3685 } else if self.state.is_handshake() && packet.header.is_short() {
3686 trace!("dropping short packet during handshake");
3688 self.qlog.emit_packet_received(qlog, now);
3689 return;
3690 } else {
3691 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3692 if let Some(hs) = self.state.as_handshake() {
3693 if self.side.is_server() && token != &hs.expected_token {
3694 warn!("discarding Initial with invalid retry token");
3698 self.qlog.emit_packet_received(qlog, now);
3699 return;
3700 }
3701 }
3702 }
3703
3704 if !self.state.is_closed() {
3705 let spin = match packet.header {
3706 Header::Short { spin, .. } => spin,
3707 _ => false,
3708 };
3709
3710 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3711 self.ensure_path(path_id, remote, now, number, None);
3713 }
3714 if self.paths.contains_key(&path_id) {
3715 self.on_packet_authenticated(
3716 now,
3717 packet.header.space(),
3718 path_id,
3719 ecn,
3720 number,
3721 spin,
3722 packet.header.is_1rtt(),
3723 );
3724 }
3725 }
3726
3727 let res = self
3728 .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog);
3729
3730 self.qlog.emit_packet_received(qlog, now);
3731 res
3732 }
3733 }
3734 };
3735
3736 if let Err(conn_err) = result {
3738 match conn_err {
3739 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3740 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3741 ConnectionError::Reset
3742 | ConnectionError::TransportError(TransportError {
3743 code: TransportErrorCode::AEAD_LIMIT_REACHED,
3744 ..
3745 }) => {
3746 self.state.move_to_drained(Some(conn_err));
3747 }
3748 ConnectionError::TimedOut => {
3749 unreachable!("timeouts aren't generated by packet processing");
3750 }
3751 ConnectionError::TransportError(err) => {
3752 debug!("closing connection due to transport error: {}", err);
3753 self.state.move_to_closed(err);
3754 }
3755 ConnectionError::VersionMismatch => {
3756 self.state.move_to_draining(Some(conn_err));
3757 }
3758 ConnectionError::LocallyClosed => {
3759 unreachable!("LocallyClosed isn't generated by packet processing");
3760 }
3761 ConnectionError::CidsExhausted => {
3762 unreachable!("CidsExhausted isn't generated by packet processing");
3763 }
3764 };
3765 }
3766
3767 if !was_closed && self.state.is_closed() {
3768 self.close_common();
3769 if !self.state.is_drained() {
3770 self.set_close_timer(now);
3771 }
3772 }
3773 if !was_drained && self.state.is_drained() {
3774 self.endpoint_events.push_back(EndpointEventInner::Drained);
3775 self.timers
3778 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3779 }
3780
3781 if matches!(self.state.as_type(), StateType::Closed) {
3783 let path_remote = self
3787 .paths
3788 .get(&path_id)
3789 .map(|p| p.data.remote)
3790 .unwrap_or(remote);
3791 self.close = remote == path_remote;
3792 }
3793 }
3794
3795 fn process_decrypted_packet(
3796 &mut self,
3797 now: Instant,
3798 remote: SocketAddr,
3799 path_id: PathId,
3800 number: Option<u64>,
3801 packet: Packet,
3802 qlog: &mut QlogRecvPacket,
3803 ) -> Result<(), ConnectionError> {
3804 if !self.paths.contains_key(&path_id) {
3805 trace!(%path_id, ?number, "discarding packet for unknown path");
3809 return Ok(());
3810 }
3811 let state = match self.state.as_type() {
3812 StateType::Established => {
3813 match packet.header.space() {
3814 SpaceId::Data => {
3815 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?
3816 }
3817 _ if packet.header.has_frames() => {
3818 self.process_early_payload(now, path_id, packet, qlog)?
3819 }
3820 _ => {
3821 trace!("discarding unexpected pre-handshake packet");
3822 }
3823 }
3824 return Ok(());
3825 }
3826 StateType::Closed => {
3827 for result in frame::Iter::new(packet.payload.freeze())? {
3828 let frame = match result {
3829 Ok(frame) => frame,
3830 Err(err) => {
3831 debug!("frame decoding error: {err:?}");
3832 continue;
3833 }
3834 };
3835 qlog.frame(&frame);
3836
3837 if let Frame::Padding = frame {
3838 continue;
3839 };
3840
3841 self.stats.frame_rx.record(&frame);
3842
3843 if let Frame::Close(_error) = frame {
3844 trace!("draining");
3845 self.state.move_to_draining(None);
3846 break;
3847 }
3848 }
3849 return Ok(());
3850 }
3851 StateType::Draining | StateType::Drained => return Ok(()),
3852 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3853 };
3854
3855 match packet.header {
3856 Header::Retry {
3857 src_cid: rem_cid, ..
3858 } => {
3859 debug_assert_eq!(path_id, PathId::ZERO);
3860 if self.side.is_server() {
3861 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3862 }
3863
3864 let is_valid_retry = self
3865 .rem_cids
3866 .get(&path_id)
3867 .map(|cids| cids.active())
3868 .map(|orig_dst_cid| {
3869 self.crypto.is_valid_retry(
3870 orig_dst_cid,
3871 &packet.header_data,
3872 &packet.payload,
3873 )
3874 })
3875 .unwrap_or_default();
3876 if self.total_authed_packets > 1
3877 || packet.payload.len() <= 16 || !is_valid_retry
3879 {
3880 trace!("discarding invalid Retry");
3881 return Ok(());
3889 }
3890
3891 trace!("retrying with CID {}", rem_cid);
3892 let client_hello = state.client_hello.take().unwrap();
3893 self.retry_src_cid = Some(rem_cid);
3894 self.rem_cids
3895 .get_mut(&path_id)
3896 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3897 .update_initial_cid(rem_cid);
3898 self.rem_handshake_cid = rem_cid;
3899
3900 let space = &mut self.spaces[SpaceId::Initial];
3901 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3902 self.on_packet_acked(now, PathId::ZERO, info);
3903 };
3904
3905 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = {
3908 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3909 space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3910 space.crypto_offset = client_hello.len() as u64;
3911 space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3912 .for_path(path_id)
3913 .next_packet_number;
3914 space.pending.crypto.push_back(frame::Crypto {
3915 offset: 0,
3916 data: client_hello,
3917 });
3918 space
3919 };
3920
3921 let zero_rtt = mem::take(
3923 &mut self.spaces[SpaceId::Data]
3924 .for_path(PathId::ZERO)
3925 .sent_packets,
3926 );
3927 for (_, info) in zero_rtt.into_iter() {
3928 self.paths
3929 .get_mut(&PathId::ZERO)
3930 .unwrap()
3931 .remove_in_flight(&info);
3932 self.spaces[SpaceId::Data].pending |= info.retransmits;
3933 }
3934 self.streams.retransmit_all_for_0rtt();
3935
3936 let token_len = packet.payload.len() - 16;
3937 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3938 unreachable!("we already short-circuited if we're server");
3939 };
3940 *token = packet.payload.freeze().split_to(token_len);
3941
3942 self.state = State::handshake(state::Handshake {
3943 expected_token: Bytes::new(),
3944 rem_cid_set: false,
3945 client_hello: None,
3946 allow_server_migration: true,
3947 });
3948 Ok(())
3949 }
3950 Header::Long {
3951 ty: LongType::Handshake,
3952 src_cid: rem_cid,
3953 dst_cid: loc_cid,
3954 ..
3955 } => {
3956 debug_assert_eq!(path_id, PathId::ZERO);
3957 if rem_cid != self.rem_handshake_cid {
3958 debug!(
3959 "discarding packet with mismatched remote CID: {} != {}",
3960 self.rem_handshake_cid, rem_cid
3961 );
3962 return Ok(());
3963 }
3964 self.on_path_validated(path_id);
3965
3966 self.process_early_payload(now, path_id, packet, qlog)?;
3967 if self.state.is_closed() {
3968 return Ok(());
3969 }
3970
3971 if self.crypto.is_handshaking() {
3972 trace!("handshake ongoing");
3973 return Ok(());
3974 }
3975
3976 if self.side.is_client() {
3977 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3979 TransportError::new(
3980 TransportErrorCode::crypto(0x6d),
3981 "transport parameters missing".to_owned(),
3982 )
3983 })?;
3984
3985 if self.has_0rtt() {
3986 if !self.crypto.early_data_accepted().unwrap() {
3987 debug_assert!(self.side.is_client());
3988 debug!("0-RTT rejected");
3989 self.accepted_0rtt = false;
3990 self.streams.zero_rtt_rejected();
3991
3992 self.spaces[SpaceId::Data].pending = Retransmits::default();
3994
3995 let sent_packets = mem::take(
3997 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3998 );
3999 for (_, packet) in sent_packets.into_iter() {
4000 self.paths
4001 .get_mut(&path_id)
4002 .unwrap()
4003 .remove_in_flight(&packet);
4004 }
4005 } else {
4006 self.accepted_0rtt = true;
4007 params.validate_resumption_from(&self.peer_params)?;
4008 }
4009 }
4010 if let Some(token) = params.stateless_reset_token {
4011 let remote = self.path_data(path_id).remote;
4012 self.endpoint_events
4013 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4014 }
4015 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4016 self.issue_first_cids(now);
4017 } else {
4018 self.spaces[SpaceId::Data].pending.handshake_done = true;
4020 self.discard_space(now, SpaceId::Handshake);
4021 self.events.push_back(Event::HandshakeConfirmed);
4022 trace!("handshake confirmed");
4023 }
4024
4025 self.events.push_back(Event::Connected);
4026 self.state.move_to_established();
4027 trace!("established");
4028
4029 self.issue_first_path_cids(now);
4032 Ok(())
4033 }
4034 Header::Initial(InitialHeader {
4035 src_cid: rem_cid,
4036 dst_cid: loc_cid,
4037 ..
4038 }) => {
4039 debug_assert_eq!(path_id, PathId::ZERO);
4040 if !state.rem_cid_set {
4041 trace!("switching remote CID to {}", rem_cid);
4042 let mut state = state.clone();
4043 self.rem_cids
4044 .get_mut(&path_id)
4045 .expect("PathId::ZERO not yet abandoned")
4046 .update_initial_cid(rem_cid);
4047 self.rem_handshake_cid = rem_cid;
4048 self.orig_rem_cid = rem_cid;
4049 state.rem_cid_set = true;
4050 self.state.move_to_handshake(state);
4051 } else if rem_cid != self.rem_handshake_cid {
4052 debug!(
4053 "discarding packet with mismatched remote CID: {} != {}",
4054 self.rem_handshake_cid, rem_cid
4055 );
4056 return Ok(());
4057 }
4058
4059 let starting_space = self.highest_space;
4060 self.process_early_payload(now, path_id, packet, qlog)?;
4061
4062 if self.side.is_server()
4063 && starting_space == SpaceId::Initial
4064 && self.highest_space != SpaceId::Initial
4065 {
4066 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4067 TransportError::new(
4068 TransportErrorCode::crypto(0x6d),
4069 "transport parameters missing".to_owned(),
4070 )
4071 })?;
4072 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4073 self.issue_first_cids(now);
4074 self.init_0rtt(now);
4075 }
4076 Ok(())
4077 }
4078 Header::Long {
4079 ty: LongType::ZeroRtt,
4080 ..
4081 } => {
4082 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?;
4083 Ok(())
4084 }
4085 Header::VersionNegotiate { .. } => {
4086 if self.total_authed_packets > 1 {
4087 return Ok(());
4088 }
4089 let supported = packet
4090 .payload
4091 .chunks(4)
4092 .any(|x| match <[u8; 4]>::try_from(x) {
4093 Ok(version) => self.version == u32::from_be_bytes(version),
4094 Err(_) => false,
4095 });
4096 if supported {
4097 return Ok(());
4098 }
4099 debug!("remote doesn't support our version");
4100 Err(ConnectionError::VersionMismatch)
4101 }
4102 Header::Short { .. } => unreachable!(
4103 "short packets received during handshake are discarded in handle_packet"
4104 ),
4105 }
4106 }
4107
4108 fn process_early_payload(
4110 &mut self,
4111 now: Instant,
4112 path_id: PathId,
4113 packet: Packet,
4114 #[allow(unused)] qlog: &mut QlogRecvPacket,
4115 ) -> Result<(), TransportError> {
4116 debug_assert_ne!(packet.header.space(), SpaceId::Data);
4117 debug_assert_eq!(path_id, PathId::ZERO);
4118 let payload_len = packet.payload.len();
4119 let mut ack_eliciting = false;
4120 for result in frame::Iter::new(packet.payload.freeze())? {
4121 let frame = result?;
4122 qlog.frame(&frame);
4123 let span = match frame {
4124 Frame::Padding => continue,
4125 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4126 };
4127
4128 self.stats.frame_rx.record(&frame);
4129
4130 let _guard = span.as_ref().map(|x| x.enter());
4131 ack_eliciting |= frame.is_ack_eliciting();
4132
4133 if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4135 return Err(TransportError::PROTOCOL_VIOLATION(
4136 "illegal frame type in handshake",
4137 ));
4138 }
4139
4140 match frame {
4141 Frame::Padding | Frame::Ping => {}
4142 Frame::Crypto(frame) => {
4143 self.read_crypto(packet.header.space(), &frame, payload_len)?;
4144 }
4145 Frame::Ack(ack) => {
4146 self.on_ack_received(now, packet.header.space(), ack)?;
4147 }
4148 Frame::PathAck(ack) => {
4149 span.as_ref()
4150 .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4151 self.on_path_ack_received(now, packet.header.space(), ack)?;
4152 }
4153 Frame::Close(reason) => {
4154 self.state.move_to_draining(Some(reason.into()));
4155 return Ok(());
4156 }
4157 _ => {
4158 let mut err =
4159 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4160 err.frame = Some(frame.ty());
4161 return Err(err);
4162 }
4163 }
4164 }
4165
4166 if ack_eliciting {
4167 self.spaces[packet.header.space()]
4169 .for_path(path_id)
4170 .pending_acks
4171 .set_immediate_ack_required();
4172 }
4173
4174 self.write_crypto();
4175 Ok(())
4176 }
4177
4178 fn process_payload(
4180 &mut self,
4181 now: Instant,
4182 remote: SocketAddr,
4183 path_id: PathId,
4184 number: u64,
4185 packet: Packet,
4186 #[allow(unused)] qlog: &mut QlogRecvPacket,
4187 ) -> Result<(), TransportError> {
4188 let payload = packet.payload.freeze();
4189 let mut is_probing_packet = true;
4190 let mut close = None;
4191 let payload_len = payload.len();
4192 let mut ack_eliciting = false;
4193 let mut migration_observed_addr = None;
4196 for result in frame::Iter::new(payload)? {
4197 let frame = result?;
4198 qlog.frame(&frame);
4199 let span = match frame {
4200 Frame::Padding => continue,
4201 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4202 };
4203
4204 self.stats.frame_rx.record(&frame);
4205 match &frame {
4208 Frame::Crypto(f) => {
4209 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4210 }
4211 Frame::Stream(f) => {
4212 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4213 }
4214 Frame::Datagram(f) => {
4215 trace!(len = f.data.len(), "got datagram frame");
4216 }
4217 f => {
4218 trace!("got frame {:?}", f);
4219 }
4220 }
4221
4222 let _guard = span.enter();
4223 if packet.header.is_0rtt() {
4224 match frame {
4225 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4226 return Err(TransportError::PROTOCOL_VIOLATION(
4227 "illegal frame type in 0-RTT",
4228 ));
4229 }
4230 _ => {
4231 if frame.is_1rtt() {
4232 return Err(TransportError::PROTOCOL_VIOLATION(
4233 "illegal frame type in 0-RTT",
4234 ));
4235 }
4236 }
4237 }
4238 }
4239 ack_eliciting |= frame.is_ack_eliciting();
4240
4241 match frame {
4243 Frame::Padding
4244 | Frame::PathChallenge(_)
4245 | Frame::PathResponse(_)
4246 | Frame::NewConnectionId(_)
4247 | Frame::ObservedAddr(_) => {}
4248 _ => {
4249 is_probing_packet = false;
4250 }
4251 }
4252
4253 match frame {
4254 Frame::Crypto(frame) => {
4255 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4256 }
4257 Frame::Stream(frame) => {
4258 if self.streams.received(frame, payload_len)?.should_transmit() {
4259 self.spaces[SpaceId::Data].pending.max_data = true;
4260 }
4261 }
4262 Frame::Ack(ack) => {
4263 self.on_ack_received(now, SpaceId::Data, ack)?;
4264 }
4265 Frame::PathAck(ack) => {
4266 span.record("path", tracing::field::debug(&ack.path_id));
4267 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4268 }
4269 Frame::Padding | Frame::Ping => {}
4270 Frame::Close(reason) => {
4271 close = Some(reason);
4272 }
4273 Frame::PathChallenge(challenge) => {
4274 let path = &mut self
4275 .path_mut(path_id)
4276 .expect("payload is processed only after the path becomes known");
4277 path.path_responses.push(number, challenge.0, remote);
4278 if remote == path.remote {
4279 match self.peer_supports_ack_frequency() {
4289 true => self.immediate_ack(path_id),
4290 false => {
4291 self.ping_path(path_id).ok();
4292 }
4293 }
4294 }
4295 }
4296 Frame::PathResponse(response) => {
4297 let path = self
4298 .paths
4299 .get_mut(&path_id)
4300 .expect("payload is processed only after the path becomes known");
4301
4302 use PathTimer::*;
4303 use paths::OnPathResponseReceived::*;
4304 match path.data.on_path_response_received(now, response.0, remote) {
4305 OnPath { was_open } => {
4306 let qlog = self.qlog.with_time(now);
4307
4308 self.timers
4309 .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4310 self.timers
4311 .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4312
4313 let next_challenge = path
4314 .data
4315 .earliest_expiring_challenge()
4316 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4317 self.timers.set_or_stop(
4318 Timer::PerPath(path_id, PathChallengeLost),
4319 next_challenge,
4320 qlog,
4321 );
4322
4323 if !was_open {
4324 self.events
4325 .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4326 if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4327 {
4328 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4329 id: path_id,
4330 addr: observed.socket_addr(),
4331 }));
4332 }
4333 }
4334 if let Some((_, ref mut prev)) = path.prev {
4335 prev.challenges_sent.clear();
4336 prev.send_new_challenge = false;
4337 }
4338 }
4339 OffPath => {
4340 debug!("Response to off-path PathChallenge!");
4341 let next_challenge = path
4342 .data
4343 .earliest_expiring_challenge()
4344 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4345 self.timers.set_or_stop(
4346 Timer::PerPath(path_id, PathChallengeLost),
4347 next_challenge,
4348 self.qlog.with_time(now),
4349 );
4350 }
4351 Invalid { expected } => {
4352 debug!(%response, from=%remote, %expected, "ignoring invalid PATH_RESPONSE")
4353 }
4354 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4355 }
4356 }
4357 Frame::MaxData(bytes) => {
4358 self.streams.received_max_data(bytes);
4359 }
4360 Frame::MaxStreamData { id, offset } => {
4361 self.streams.received_max_stream_data(id, offset)?;
4362 }
4363 Frame::MaxStreams { dir, count } => {
4364 self.streams.received_max_streams(dir, count)?;
4365 }
4366 Frame::ResetStream(frame) => {
4367 if self.streams.received_reset(frame)?.should_transmit() {
4368 self.spaces[SpaceId::Data].pending.max_data = true;
4369 }
4370 }
4371 Frame::DataBlocked { offset } => {
4372 debug!(offset, "peer claims to be blocked at connection level");
4373 }
4374 Frame::StreamDataBlocked { id, offset } => {
4375 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4376 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4377 return Err(TransportError::STREAM_STATE_ERROR(
4378 "STREAM_DATA_BLOCKED on send-only stream",
4379 ));
4380 }
4381 debug!(
4382 stream = %id,
4383 offset, "peer claims to be blocked at stream level"
4384 );
4385 }
4386 Frame::StreamsBlocked { dir, limit } => {
4387 if limit > MAX_STREAM_COUNT {
4388 return Err(TransportError::FRAME_ENCODING_ERROR(
4389 "unrepresentable stream limit",
4390 ));
4391 }
4392 debug!(
4393 "peer claims to be blocked opening more than {} {} streams",
4394 limit, dir
4395 );
4396 }
4397 Frame::StopSending(frame::StopSending { id, error_code }) => {
4398 if id.initiator() != self.side.side() {
4399 if id.dir() == Dir::Uni {
4400 debug!("got STOP_SENDING on recv-only {}", id);
4401 return Err(TransportError::STREAM_STATE_ERROR(
4402 "STOP_SENDING on recv-only stream",
4403 ));
4404 }
4405 } else if self.streams.is_local_unopened(id) {
4406 return Err(TransportError::STREAM_STATE_ERROR(
4407 "STOP_SENDING on unopened stream",
4408 ));
4409 }
4410 self.streams.received_stop_sending(id, error_code);
4411 }
4412 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4413 if let Some(ref path_id) = path_id {
4414 span.record("path", tracing::field::debug(&path_id));
4415 }
4416 let path_id = path_id.unwrap_or_default();
4417 match self.local_cid_state.get_mut(&path_id) {
4418 None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4419 Some(cid_state) => {
4420 let allow_more_cids = cid_state
4421 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4422
4423 let has_path = !self.abandoned_paths.contains(&path_id);
4427 let allow_more_cids = allow_more_cids && has_path;
4428
4429 self.endpoint_events
4430 .push_back(EndpointEventInner::RetireConnectionId(
4431 now,
4432 path_id,
4433 sequence,
4434 allow_more_cids,
4435 ));
4436 }
4437 }
4438 }
4439 Frame::NewConnectionId(frame) => {
4440 let path_id = if let Some(path_id) = frame.path_id {
4441 if !self.is_multipath_negotiated() {
4442 return Err(TransportError::PROTOCOL_VIOLATION(
4443 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4444 ));
4445 }
4446 if path_id > self.local_max_path_id {
4447 return Err(TransportError::PROTOCOL_VIOLATION(
4448 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4449 ));
4450 }
4451 path_id
4452 } else {
4453 PathId::ZERO
4454 };
4455
4456 if self.abandoned_paths.contains(&path_id) {
4457 trace!("ignoring issued CID for abandoned path");
4458 continue;
4459 }
4460 if let Some(ref path_id) = frame.path_id {
4461 span.record("path", tracing::field::debug(&path_id));
4462 }
4463 let rem_cids = self
4464 .rem_cids
4465 .entry(path_id)
4466 .or_insert_with(|| CidQueue::new(frame.id));
4467 if rem_cids.active().is_empty() {
4468 return Err(TransportError::PROTOCOL_VIOLATION(
4470 "NEW_CONNECTION_ID when CIDs aren't in use",
4471 ));
4472 }
4473 if frame.retire_prior_to > frame.sequence {
4474 return Err(TransportError::PROTOCOL_VIOLATION(
4475 "NEW_CONNECTION_ID retiring unissued CIDs",
4476 ));
4477 }
4478
4479 use crate::cid_queue::InsertError;
4480 match rem_cids.insert(frame) {
4481 Ok(None) => {}
4482 Ok(Some((retired, reset_token))) => {
4483 let pending_retired =
4484 &mut self.spaces[SpaceId::Data].pending.retire_cids;
4485 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4488 if (pending_retired.len() as u64)
4491 .saturating_add(retired.end.saturating_sub(retired.start))
4492 > MAX_PENDING_RETIRED_CIDS
4493 {
4494 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4495 "queued too many retired CIDs",
4496 ));
4497 }
4498 pending_retired.extend(retired.map(|seq| (path_id, seq)));
4499 self.set_reset_token(path_id, remote, reset_token);
4500 }
4501 Err(InsertError::ExceedsLimit) => {
4502 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4503 }
4504 Err(InsertError::Retired) => {
4505 trace!("discarding already-retired");
4506 self.spaces[SpaceId::Data]
4510 .pending
4511 .retire_cids
4512 .push((path_id, frame.sequence));
4513 continue;
4514 }
4515 };
4516
4517 if self.side.is_server()
4518 && path_id == PathId::ZERO
4519 && self
4520 .rem_cids
4521 .get(&PathId::ZERO)
4522 .map(|cids| cids.active_seq() == 0)
4523 .unwrap_or_default()
4524 {
4525 self.update_rem_cid(PathId::ZERO);
4528 }
4529 }
4530 Frame::NewToken(NewToken { token }) => {
4531 let ConnectionSide::Client {
4532 token_store,
4533 server_name,
4534 ..
4535 } = &self.side
4536 else {
4537 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4538 };
4539 if token.is_empty() {
4540 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4541 }
4542 trace!("got new token");
4543 token_store.insert(server_name, token);
4544 }
4545 Frame::Datagram(datagram) => {
4546 if self
4547 .datagrams
4548 .received(datagram, &self.config.datagram_receive_buffer_size)?
4549 {
4550 self.events.push_back(Event::DatagramReceived);
4551 }
4552 }
4553 Frame::AckFrequency(ack_frequency) => {
4554 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4557 continue;
4560 }
4561
4562 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4564 space.pending_acks.set_ack_frequency_params(&ack_frequency);
4565
4566 if let Some(timeout) = space
4569 .pending_acks
4570 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4571 {
4572 self.timers.set(
4573 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4574 timeout,
4575 self.qlog.with_time(now),
4576 );
4577 }
4578 }
4579 }
4580 Frame::ImmediateAck => {
4581 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4583 pns.pending_acks.set_immediate_ack_required();
4584 }
4585 }
4586 Frame::HandshakeDone => {
4587 if self.side.is_server() {
4588 return Err(TransportError::PROTOCOL_VIOLATION(
4589 "client sent HANDSHAKE_DONE",
4590 ));
4591 }
4592 if self.spaces[SpaceId::Handshake].crypto.is_some() {
4593 self.discard_space(now, SpaceId::Handshake);
4594 }
4595 self.events.push_back(Event::HandshakeConfirmed);
4596 trace!("handshake confirmed");
4597 }
4598 Frame::ObservedAddr(observed) => {
4599 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4601 if !self
4602 .peer_params
4603 .address_discovery_role
4604 .should_report(&self.config.address_discovery_role)
4605 {
4606 return Err(TransportError::PROTOCOL_VIOLATION(
4607 "received OBSERVED_ADDRESS frame when not negotiated",
4608 ));
4609 }
4610 if packet.header.space() != SpaceId::Data {
4612 return Err(TransportError::PROTOCOL_VIOLATION(
4613 "OBSERVED_ADDRESS frame outside data space",
4614 ));
4615 }
4616
4617 let path = self.path_data_mut(path_id);
4618 if remote == path.remote {
4619 if let Some(updated) = path.update_observed_addr_report(observed) {
4620 if path.open {
4621 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4622 id: path_id,
4623 addr: updated,
4624 }));
4625 }
4626 }
4628 } else {
4629 migration_observed_addr = Some(observed)
4631 }
4632 }
4633 Frame::PathAbandon(frame::PathAbandon {
4634 path_id,
4635 error_code,
4636 }) => {
4637 span.record("path", tracing::field::debug(&path_id));
4638 let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4640 Ok(()) => {
4641 trace!("peer abandoned path");
4642 false
4643 }
4644 Err(ClosePathError::LastOpenPath) => {
4645 trace!("peer abandoned last path, closing connection");
4646 return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4648 }
4649 Err(ClosePathError::ClosedPath) => {
4650 trace!("peer abandoned already closed path");
4651 true
4652 }
4653 };
4654 if self.path(path_id).is_some() && !already_abandoned {
4659 let delay = self.pto(SpaceId::Data, path_id) * 3;
4664 self.timers.set(
4665 Timer::PerPath(path_id, PathTimer::DiscardPath),
4666 now + delay,
4667 self.qlog.with_time(now),
4668 );
4669 }
4670 }
4671 Frame::PathStatusAvailable(info) => {
4672 span.record("path", tracing::field::debug(&info.path_id));
4673 if self.is_multipath_negotiated() {
4674 self.on_path_status(
4675 info.path_id,
4676 PathStatus::Available,
4677 info.status_seq_no,
4678 );
4679 } else {
4680 return Err(TransportError::PROTOCOL_VIOLATION(
4681 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4682 ));
4683 }
4684 }
4685 Frame::PathStatusBackup(info) => {
4686 span.record("path", tracing::field::debug(&info.path_id));
4687 if self.is_multipath_negotiated() {
4688 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4689 } else {
4690 return Err(TransportError::PROTOCOL_VIOLATION(
4691 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4692 ));
4693 }
4694 }
4695 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4696 span.record("path", tracing::field::debug(&path_id));
4697 if !self.is_multipath_negotiated() {
4698 return Err(TransportError::PROTOCOL_VIOLATION(
4699 "received MAX_PATH_ID frame when multipath was not negotiated",
4700 ));
4701 }
4702 if path_id > self.remote_max_path_id {
4704 self.remote_max_path_id = path_id;
4705 self.issue_first_path_cids(now);
4706 }
4707 }
4708 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4709 if self.is_multipath_negotiated() {
4713 if self.local_max_path_id > max_path_id {
4714 return Err(TransportError::PROTOCOL_VIOLATION(
4715 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4716 ));
4717 }
4718 debug!("received PATHS_BLOCKED({:?})", max_path_id);
4719 } else {
4721 return Err(TransportError::PROTOCOL_VIOLATION(
4722 "received PATHS_BLOCKED frame when not multipath was not negotiated",
4723 ));
4724 }
4725 }
4726 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4727 if self.is_multipath_negotiated() {
4735 if path_id > self.local_max_path_id {
4736 return Err(TransportError::PROTOCOL_VIOLATION(
4737 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4738 ));
4739 }
4740 if next_seq.0
4741 > self
4742 .local_cid_state
4743 .get(&path_id)
4744 .map(|cid_state| cid_state.active_seq().1 + 1)
4745 .unwrap_or_default()
4746 {
4747 return Err(TransportError::PROTOCOL_VIOLATION(
4748 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4749 ));
4750 }
4751 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4752 } else {
4753 return Err(TransportError::PROTOCOL_VIOLATION(
4754 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4755 ));
4756 }
4757 }
4758 Frame::AddAddress(addr) => {
4759 let client_state = match self.iroh_hp.client_side_mut() {
4760 Ok(state) => state,
4761 Err(err) => {
4762 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4763 "Nat traversal(ADD_ADDRESS): {err}"
4764 )));
4765 }
4766 };
4767
4768 if !client_state.check_remote_address(&addr) {
4769 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4771 }
4772
4773 match client_state.add_remote_address(addr) {
4774 Ok(maybe_added) => {
4775 if let Some(added) = maybe_added {
4776 self.events.push_back(Event::NatTraversal(
4777 iroh_hp::Event::AddressAdded(added),
4778 ));
4779 }
4780 }
4781 Err(e) => {
4782 warn!(%e, "failed to add remote address")
4783 }
4784 }
4785 }
4786 Frame::RemoveAddress(addr) => {
4787 let client_state = match self.iroh_hp.client_side_mut() {
4788 Ok(state) => state,
4789 Err(err) => {
4790 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4791 "Nat traversal(REMOVE_ADDRESS): {err}"
4792 )));
4793 }
4794 };
4795 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4796 self.events
4797 .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4798 removed_addr,
4799 )));
4800 }
4801 }
4802 Frame::ReachOut(reach_out) => {
4803 let server_state = match self.iroh_hp.server_side_mut() {
4804 Ok(state) => state,
4805 Err(err) => {
4806 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4807 "Nat traversal(REACH_OUT): {err}"
4808 )));
4809 }
4810 };
4811
4812 if let Err(err) = server_state.handle_reach_out(reach_out) {
4813 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4814 "Nat traversal(REACH_OUT): {err}"
4815 )));
4816 }
4817 }
4818 }
4819 }
4820
4821 let space = self.spaces[SpaceId::Data].for_path(path_id);
4822 if space
4823 .pending_acks
4824 .packet_received(now, number, ack_eliciting, &space.dedup)
4825 {
4826 if self.abandoned_paths.contains(&path_id) {
4827 space.pending_acks.set_immediate_ack_required();
4830 } else {
4831 self.timers.set(
4832 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4833 now + self.ack_frequency.max_ack_delay,
4834 self.qlog.with_time(now),
4835 );
4836 }
4837 }
4838
4839 let pending = &mut self.spaces[SpaceId::Data].pending;
4844 self.streams.queue_max_stream_id(pending);
4845
4846 if let Some(reason) = close {
4847 self.state.move_to_draining(Some(reason.into()));
4848 self.close = true;
4849 }
4850
4851 if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4852 && !is_probing_packet
4853 && remote != self.path_data(path_id).remote
4854 {
4855 let ConnectionSide::Server { ref server_config } = self.side else {
4856 panic!("packets from unknown remote should be dropped by clients");
4857 };
4858 debug_assert!(
4859 server_config.migration,
4860 "migration-initiating packets should have been dropped immediately"
4861 );
4862 self.migrate(path_id, now, remote, migration_observed_addr);
4863 self.update_rem_cid(path_id);
4865 self.spin = false;
4866 }
4867
4868 Ok(())
4869 }
4870
4871 fn migrate(
4872 &mut self,
4873 path_id: PathId,
4874 now: Instant,
4875 remote: SocketAddr,
4876 observed_addr: Option<ObservedAddr>,
4877 ) {
4878 trace!(%remote, %path_id, "migration initiated");
4879 self.path_counter = self.path_counter.wrapping_add(1);
4880 let prev_pto = self.pto(SpaceId::Data, path_id);
4887 let known_path = self.paths.get_mut(&path_id).expect("known path");
4888 let path = &mut known_path.data;
4889 let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() {
4890 PathData::from_previous(remote, path, self.path_counter, now)
4891 } else {
4892 let peer_max_udp_payload_size =
4893 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4894 .unwrap_or(u16::MAX);
4895 PathData::new(
4896 remote,
4897 self.allow_mtud,
4898 Some(peer_max_udp_payload_size),
4899 self.path_counter,
4900 now,
4901 &self.config,
4902 )
4903 };
4904 new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4905 if let Some(report) = observed_addr {
4906 if let Some(updated) = new_path.update_observed_addr_report(report) {
4907 tracing::info!("adding observed addr event from migration");
4908 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4909 id: path_id,
4910 addr: updated,
4911 }));
4912 }
4913 }
4914 new_path.send_new_challenge = true;
4915
4916 let mut prev = mem::replace(path, new_path);
4917 if !prev.is_validating_path() {
4919 prev.send_new_challenge = true;
4920 known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4924 }
4925
4926 self.qlog.emit_tuple_assigned(path_id, remote, now);
4928
4929 self.timers.set(
4930 Timer::PerPath(path_id, PathTimer::PathValidation),
4931 now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4932 self.qlog.with_time(now),
4933 );
4934 }
4935
4936 pub fn local_address_changed(&mut self) {
4938 self.update_rem_cid(PathId::ZERO);
4940 self.ping();
4941 }
4942
4943 fn update_rem_cid(&mut self, path_id: PathId) {
4945 let Some((reset_token, retired)) =
4946 self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4947 else {
4948 return;
4949 };
4950
4951 self.spaces[SpaceId::Data]
4953 .pending
4954 .retire_cids
4955 .extend(retired.map(|seq| (path_id, seq)));
4956 let remote = self.path_data(path_id).remote;
4957 self.set_reset_token(path_id, remote, reset_token);
4958 }
4959
4960 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4969 self.endpoint_events
4970 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4971
4972 if path_id == PathId::ZERO {
4978 self.peer_params.stateless_reset_token = Some(reset_token);
4979 }
4980 }
4981
4982 fn issue_first_cids(&mut self, now: Instant) {
4984 if self
4985 .local_cid_state
4986 .get(&PathId::ZERO)
4987 .expect("PathId::ZERO exists when the connection is created")
4988 .cid_len()
4989 == 0
4990 {
4991 return;
4992 }
4993
4994 let mut n = self.peer_params.issue_cids_limit() - 1;
4996 if let ConnectionSide::Server { server_config } = &self.side {
4997 if server_config.has_preferred_address() {
4998 n -= 1;
5000 }
5001 }
5002 self.endpoint_events
5003 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5004 }
5005
5006 fn issue_first_path_cids(&mut self, now: Instant) {
5010 if let Some(max_path_id) = self.max_path_id() {
5011 let mut path_id = self.max_path_id_with_cids.next();
5012 while path_id <= max_path_id {
5013 self.endpoint_events
5014 .push_back(EndpointEventInner::NeedIdentifiers(
5015 path_id,
5016 now,
5017 self.peer_params.issue_cids_limit(),
5018 ));
5019 path_id = path_id.next();
5020 }
5021 self.max_path_id_with_cids = max_path_id;
5022 }
5023 }
5024
5025 fn populate_packet(
5033 &mut self,
5034 now: Instant,
5035 space_id: SpaceId,
5036 path_id: PathId,
5037 path_exclusive_only: bool,
5038 buf: &mut impl BufMut,
5039 pn: u64,
5040 #[allow(unused)] qlog: &mut QlogSentPacket,
5041 ) -> SentFrames {
5042 let mut sent = SentFrames::default();
5043 let is_multipath_negotiated = self.is_multipath_negotiated();
5044 let space = &mut self.spaces[space_id];
5045 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5046 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5047 space
5048 .for_path(path_id)
5049 .pending_acks
5050 .maybe_ack_non_eliciting();
5051
5052 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5054 trace!("HANDSHAKE_DONE");
5055 buf.write(frame::FrameType::HANDSHAKE_DONE);
5056 qlog.frame(&Frame::HandshakeDone);
5057 sent.retransmits.get_or_create().handshake_done = true;
5058 self.stats.frame_tx.handshake_done =
5060 self.stats.frame_tx.handshake_done.saturating_add(1);
5061 }
5062
5063 if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5066 while let Some(local_addr) = addresses.pop() {
5067 let reach_out = frame::ReachOut::new(*round, local_addr);
5068 if buf.remaining_mut() > reach_out.size() {
5069 trace!(%round, ?local_addr, "REACH_OUT");
5070 reach_out.write(buf);
5071 let sent_reachouts = sent
5072 .retransmits
5073 .get_or_create()
5074 .reach_out
5075 .get_or_insert_with(|| (*round, Default::default()));
5076 sent_reachouts.1.push(local_addr);
5077 self.stats.frame_tx.reach_out = self.stats.frame_tx.reach_out.saturating_add(1);
5078 qlog.frame(&Frame::ReachOut(reach_out));
5079 } else {
5080 addresses.push(local_addr);
5081 break;
5082 }
5083 }
5084 if addresses.is_empty() {
5085 space.pending.reach_out = None;
5086 }
5087 }
5088
5089 if !path_exclusive_only
5091 && space_id == SpaceId::Data
5092 && self
5093 .config
5094 .address_discovery_role
5095 .should_report(&self.peer_params.address_discovery_role)
5096 && (!path.observed_addr_sent || space.pending.observed_addr)
5097 {
5098 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5099 if buf.remaining_mut() > frame.size() {
5100 trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5101 frame.write(buf);
5102
5103 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5104 path.observed_addr_sent = true;
5105
5106 self.stats.frame_tx.observed_addr += 1;
5107 sent.retransmits.get_or_create().observed_addr = true;
5108 space.pending.observed_addr = false;
5109 qlog.frame(&Frame::ObservedAddr(frame));
5110 }
5111 }
5112
5113 if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5115 trace!("PING");
5116 buf.write(frame::FrameType::PING);
5117 sent.non_retransmits = true;
5118 self.stats.frame_tx.ping += 1;
5119 qlog.frame(&Frame::Ping);
5120 }
5121
5122 if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5124 debug_assert_eq!(
5125 space_id,
5126 SpaceId::Data,
5127 "immediate acks must be sent in the data space"
5128 );
5129 trace!("IMMEDIATE_ACK");
5130 buf.write(frame::FrameType::IMMEDIATE_ACK);
5131 sent.non_retransmits = true;
5132 self.stats.frame_tx.immediate_ack += 1;
5133 qlog.frame(&Frame::ImmediateAck);
5134 }
5135
5136 if !path_exclusive_only {
5140 for path_id in space
5141 .number_spaces
5142 .iter_mut()
5143 .filter(|(_, pns)| pns.pending_acks.can_send())
5144 .map(|(&path_id, _)| path_id)
5145 .collect::<Vec<_>>()
5146 {
5147 Self::populate_acks(
5148 now,
5149 self.receiving_ecn,
5150 &mut sent,
5151 path_id,
5152 space_id,
5153 space,
5154 is_multipath_negotiated,
5155 buf,
5156 &mut self.stats,
5157 qlog,
5158 );
5159 }
5160 }
5161
5162 if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5164 let sequence_number = self.ack_frequency.next_sequence_number();
5165
5166 let config = self.config.ack_frequency_config.as_ref().unwrap();
5168
5169 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5171 path.rtt.get(),
5172 config,
5173 &self.peer_params,
5174 );
5175
5176 trace!(?max_ack_delay, "ACK_FREQUENCY");
5177
5178 let frame = frame::AckFrequency {
5179 sequence: sequence_number,
5180 ack_eliciting_threshold: config.ack_eliciting_threshold,
5181 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5182 reordering_threshold: config.reordering_threshold,
5183 };
5184 frame.encode(buf);
5185 qlog.frame(&Frame::AckFrequency(frame));
5186
5187 sent.retransmits.get_or_create().ack_frequency = true;
5188
5189 self.ack_frequency
5190 .ack_frequency_sent(path_id, pn, max_ack_delay);
5191 self.stats.frame_tx.ack_frequency += 1;
5192 }
5193
5194 if buf.remaining_mut() > frame::PathChallenge::SIZE_BOUND
5196 && space_id == SpaceId::Data
5197 && path.send_new_challenge
5198 && !self.state.is_closed()
5199 {
5201 path.send_new_challenge = false;
5202
5203 let token = self.rng.random();
5205 let info = paths::SentChallengeInfo {
5206 sent_instant: now,
5207 remote: path.remote,
5208 };
5209 path.challenges_sent.insert(token, info);
5210 sent.non_retransmits = true;
5211 sent.requires_padding = true;
5212 let challenge = frame::PathChallenge(token);
5213 trace!(%challenge, "sending new challenge");
5214 buf.write(challenge);
5215 qlog.frame(&Frame::PathChallenge(challenge));
5216 self.stats.frame_tx.path_challenge += 1;
5217 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5218 self.timers.set(
5219 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5220 now + pto,
5221 self.qlog.with_time(now),
5222 );
5223
5224 if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5225 space.pending.path_status.insert(path_id);
5227 }
5228
5229 if space_id == SpaceId::Data
5232 && self
5233 .config
5234 .address_discovery_role
5235 .should_report(&self.peer_params.address_discovery_role)
5236 {
5237 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5238 if buf.remaining_mut() > frame.size() {
5239 frame.write(buf);
5240 qlog.frame(&Frame::ObservedAddr(frame));
5241
5242 self.next_observed_addr_seq_no =
5243 self.next_observed_addr_seq_no.saturating_add(1u8);
5244 path.observed_addr_sent = true;
5245
5246 self.stats.frame_tx.observed_addr += 1;
5247 sent.retransmits.get_or_create().observed_addr = true;
5248 space.pending.observed_addr = false;
5249 }
5250 }
5251 }
5252
5253 if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data {
5255 if let Some(token) = path.path_responses.pop_on_path(path.remote) {
5256 sent.non_retransmits = true;
5257 sent.requires_padding = true;
5258 let response = frame::PathResponse(token);
5259 trace!(%response, "sending response");
5260 buf.write(response);
5261 qlog.frame(&Frame::PathResponse(response));
5262 self.stats.frame_tx.path_response += 1;
5263
5264 if space_id == SpaceId::Data
5268 && self
5269 .config
5270 .address_discovery_role
5271 .should_report(&self.peer_params.address_discovery_role)
5272 {
5273 let frame =
5274 frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5275 if buf.remaining_mut() > frame.size() {
5276 frame.write(buf);
5277 qlog.frame(&Frame::ObservedAddr(frame));
5278
5279 self.next_observed_addr_seq_no =
5280 self.next_observed_addr_seq_no.saturating_add(1u8);
5281 path.observed_addr_sent = true;
5282
5283 self.stats.frame_tx.observed_addr += 1;
5284 sent.retransmits.get_or_create().observed_addr = true;
5285 space.pending.observed_addr = false;
5286 }
5287 }
5288 }
5289 }
5290
5291 while !path_exclusive_only && buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt {
5293 let mut frame = match space.pending.crypto.pop_front() {
5294 Some(x) => x,
5295 None => break,
5296 };
5297
5298 let max_crypto_data_size = buf.remaining_mut()
5303 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5305 - 2; let len = frame
5308 .data
5309 .len()
5310 .min(2usize.pow(14) - 1)
5311 .min(max_crypto_data_size);
5312
5313 let data = frame.data.split_to(len);
5314 let truncated = frame::Crypto {
5315 offset: frame.offset,
5316 data,
5317 };
5318 trace!(
5319 "CRYPTO: off {} len {}",
5320 truncated.offset,
5321 truncated.data.len()
5322 );
5323 truncated.encode(buf);
5324 self.stats.frame_tx.crypto += 1;
5325
5326 #[cfg(feature = "qlog")]
5328 qlog.frame(&Frame::Crypto(truncated.clone()));
5329 sent.retransmits.get_or_create().crypto.push_back(truncated);
5330 if !frame.data.is_empty() {
5331 frame.offset += len as u64;
5332 space.pending.crypto.push_front(frame);
5333 }
5334 }
5335
5336 while !path_exclusive_only
5339 && space_id == SpaceId::Data
5340 && frame::PathAbandon::SIZE_BOUND <= buf.remaining_mut()
5341 {
5342 let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5343 break;
5344 };
5345 let frame = frame::PathAbandon {
5346 path_id,
5347 error_code,
5348 };
5349 frame.encode(buf);
5350 qlog.frame(&Frame::PathAbandon(frame));
5351 self.stats.frame_tx.path_abandon += 1;
5352 trace!(%path_id, "PATH_ABANDON");
5353 sent.retransmits
5354 .get_or_create()
5355 .path_abandon
5356 .entry(path_id)
5357 .or_insert(error_code);
5358 }
5359
5360 while !path_exclusive_only
5362 && space_id == SpaceId::Data
5363 && frame::PathStatusAvailable::SIZE_BOUND <= buf.remaining_mut()
5364 {
5365 let Some(path_id) = space.pending.path_status.pop_first() else {
5366 break;
5367 };
5368 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5369 trace!(%path_id, "discarding queued path status for unknown path");
5370 continue;
5371 };
5372
5373 let seq = path.status.seq();
5374 sent.retransmits.get_or_create().path_status.insert(path_id);
5375 match path.local_status() {
5376 PathStatus::Available => {
5377 let frame = frame::PathStatusAvailable {
5378 path_id,
5379 status_seq_no: seq,
5380 };
5381 frame.encode(buf);
5382 qlog.frame(&Frame::PathStatusAvailable(frame));
5383 self.stats.frame_tx.path_status_available += 1;
5384 trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5385 }
5386 PathStatus::Backup => {
5387 let frame = frame::PathStatusBackup {
5388 path_id,
5389 status_seq_no: seq,
5390 };
5391 frame.encode(buf);
5392 qlog.frame(&Frame::PathStatusBackup(frame));
5393 self.stats.frame_tx.path_status_backup += 1;
5394 trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5395 }
5396 }
5397 }
5398
5399 if space_id == SpaceId::Data
5401 && space.pending.max_path_id
5402 && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut()
5403 {
5404 let frame = frame::MaxPathId(self.local_max_path_id);
5405 frame.encode(buf);
5406 qlog.frame(&Frame::MaxPathId(frame));
5407 space.pending.max_path_id = false;
5408 sent.retransmits.get_or_create().max_path_id = true;
5409 trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5410 self.stats.frame_tx.max_path_id += 1;
5411 }
5412
5413 if space_id == SpaceId::Data
5415 && space.pending.paths_blocked
5416 && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut()
5417 {
5418 let frame = frame::PathsBlocked(self.remote_max_path_id);
5419 frame.encode(buf);
5420 qlog.frame(&Frame::PathsBlocked(frame));
5421 space.pending.paths_blocked = false;
5422 sent.retransmits.get_or_create().paths_blocked = true;
5423 trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED");
5424 self.stats.frame_tx.paths_blocked += 1;
5425 }
5426
5427 while space_id == SpaceId::Data && frame::PathCidsBlocked::SIZE_BOUND <= buf.remaining_mut()
5429 {
5430 let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5431 break;
5432 };
5433 let next_seq = match self.rem_cids.get(&path_id) {
5434 Some(cid_queue) => cid_queue.active_seq() + 1,
5435 None => 0,
5436 };
5437 let frame = frame::PathCidsBlocked {
5438 path_id,
5439 next_seq: VarInt(next_seq),
5440 };
5441 frame.encode(buf);
5442 qlog.frame(&Frame::PathCidsBlocked(frame));
5443 sent.retransmits
5444 .get_or_create()
5445 .path_cids_blocked
5446 .push(path_id);
5447 trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5448 self.stats.frame_tx.path_cids_blocked += 1;
5449 }
5450
5451 if space_id == SpaceId::Data {
5453 self.streams.write_control_frames(
5454 buf,
5455 &mut space.pending,
5456 &mut sent.retransmits,
5457 &mut self.stats.frame_tx,
5458 qlog,
5459 );
5460 }
5461
5462 let cid_len = self
5464 .local_cid_state
5465 .values()
5466 .map(|cid_state| cid_state.cid_len())
5467 .max()
5468 .expect("some local CID state must exist");
5469 let new_cid_size_bound =
5470 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5471 while !path_exclusive_only && buf.remaining_mut() > new_cid_size_bound {
5472 let issued = match space.pending.new_cids.pop() {
5473 Some(x) => x,
5474 None => break,
5475 };
5476 let retire_prior_to = self
5477 .local_cid_state
5478 .get(&issued.path_id)
5479 .map(|cid_state| cid_state.retire_prior_to())
5480 .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5481
5482 let cid_path_id = match is_multipath_negotiated {
5483 true => {
5484 trace!(
5485 path_id = ?issued.path_id,
5486 sequence = issued.sequence,
5487 id = %issued.id,
5488 "PATH_NEW_CONNECTION_ID",
5489 );
5490 self.stats.frame_tx.path_new_connection_id += 1;
5491 Some(issued.path_id)
5492 }
5493 false => {
5494 trace!(
5495 sequence = issued.sequence,
5496 id = %issued.id,
5497 "NEW_CONNECTION_ID"
5498 );
5499 debug_assert_eq!(issued.path_id, PathId::ZERO);
5500 self.stats.frame_tx.new_connection_id += 1;
5501 None
5502 }
5503 };
5504 let frame = frame::NewConnectionId {
5505 path_id: cid_path_id,
5506 sequence: issued.sequence,
5507 retire_prior_to,
5508 id: issued.id,
5509 reset_token: issued.reset_token,
5510 };
5511 frame.encode(buf);
5512 sent.retransmits.get_or_create().new_cids.push(issued);
5513 qlog.frame(&Frame::NewConnectionId(frame));
5514 }
5515
5516 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5518 while !path_exclusive_only && buf.remaining_mut() > retire_cid_bound {
5519 let (path_id, sequence) = match space.pending.retire_cids.pop() {
5520 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5521 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5522 self.stats.frame_tx.retire_connection_id += 1;
5523 (None, seq)
5524 }
5525 Some((path_id, seq)) => {
5526 trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5527 self.stats.frame_tx.path_retire_connection_id += 1;
5528 (Some(path_id), seq)
5529 }
5530 None => break,
5531 };
5532 let frame = frame::RetireConnectionId { path_id, sequence };
5533 frame.encode(buf);
5534 qlog.frame(&Frame::RetireConnectionId(frame));
5535 sent.retransmits
5536 .get_or_create()
5537 .retire_cids
5538 .push((path_id.unwrap_or_default(), sequence));
5539 }
5540
5541 let mut sent_datagrams = false;
5543 while !path_exclusive_only
5544 && buf.remaining_mut() > Datagram::SIZE_BOUND
5545 && space_id == SpaceId::Data
5546 {
5547 let prev_remaining = buf.remaining_mut();
5548 match self.datagrams.write(buf) {
5549 true => {
5550 sent_datagrams = true;
5551 sent.non_retransmits = true;
5552 self.stats.frame_tx.datagram += 1;
5553 qlog.frame_datagram((prev_remaining - buf.remaining_mut()) as u64);
5554 }
5555 false => break,
5556 }
5557 }
5558 if self.datagrams.send_blocked && sent_datagrams {
5559 self.events.push_back(Event::DatagramsUnblocked);
5560 self.datagrams.send_blocked = false;
5561 }
5562
5563 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5564
5565 while let Some(remote_addr) = space.pending.new_tokens.pop() {
5567 if path_exclusive_only {
5568 break;
5569 }
5570 debug_assert_eq!(space_id, SpaceId::Data);
5571 let ConnectionSide::Server { server_config } = &self.side else {
5572 panic!("NEW_TOKEN frames should not be enqueued by clients");
5573 };
5574
5575 if remote_addr != path.remote {
5576 continue;
5581 }
5582
5583 let token = Token::new(
5584 TokenPayload::Validation {
5585 ip: remote_addr.ip(),
5586 issued: server_config.time_source.now(),
5587 },
5588 &mut self.rng,
5589 );
5590 let new_token = NewToken {
5591 token: token.encode(&*server_config.token_key).into(),
5592 };
5593
5594 if buf.remaining_mut() < new_token.size() {
5595 space.pending.new_tokens.push(remote_addr);
5596 break;
5597 }
5598
5599 trace!("NEW_TOKEN");
5600 new_token.encode(buf);
5601 qlog.frame(&Frame::NewToken(new_token));
5602 sent.retransmits
5603 .get_or_create()
5604 .new_tokens
5605 .push(remote_addr);
5606 self.stats.frame_tx.new_token += 1;
5607 }
5608
5609 if !path_exclusive_only && space_id == SpaceId::Data {
5611 sent.stream_frames =
5612 self.streams
5613 .write_stream_frames(buf, self.config.send_fairness, qlog);
5614 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
5615 }
5616
5617 while space_id == SpaceId::Data && frame::AddAddress::SIZE_BOUND <= buf.remaining_mut() {
5620 if let Some(added_address) = space.pending.add_address.pop_last() {
5621 trace!(
5622 seq = %added_address.seq_no,
5623 ip = ?added_address.ip,
5624 port = added_address.port,
5625 "ADD_ADDRESS",
5626 );
5627 added_address.write(buf);
5628 sent.retransmits
5629 .get_or_create()
5630 .add_address
5631 .insert(added_address);
5632 self.stats.frame_tx.add_address = self.stats.frame_tx.add_address.saturating_add(1);
5633 qlog.frame(&Frame::AddAddress(added_address));
5634 } else {
5635 break;
5636 }
5637 }
5638
5639 while space_id == SpaceId::Data && frame::RemoveAddress::SIZE_BOUND <= buf.remaining_mut() {
5641 if let Some(removed_address) = space.pending.remove_address.pop_last() {
5642 trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5643 removed_address.write(buf);
5644 sent.retransmits
5645 .get_or_create()
5646 .remove_address
5647 .insert(removed_address);
5648 self.stats.frame_tx.remove_address =
5649 self.stats.frame_tx.remove_address.saturating_add(1);
5650 qlog.frame(&Frame::RemoveAddress(removed_address));
5651 } else {
5652 break;
5653 }
5654 }
5655
5656 sent
5657 }
5658
5659 fn populate_acks(
5661 now: Instant,
5662 receiving_ecn: bool,
5663 sent: &mut SentFrames,
5664 path_id: PathId,
5665 space_id: SpaceId,
5666 space: &mut PacketSpace,
5667 is_multipath_negotiated: bool,
5668 buf: &mut impl BufMut,
5669 stats: &mut ConnectionStats,
5670 #[allow(unused)] qlog: &mut QlogSentPacket,
5671 ) {
5672 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5674
5675 debug_assert!(
5676 is_multipath_negotiated || path_id == PathId::ZERO,
5677 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5678 );
5679 if is_multipath_negotiated {
5680 debug_assert!(
5681 space_id == SpaceId::Data || path_id == PathId::ZERO,
5682 "path acks must be sent in 1RTT space (have {space_id:?})"
5683 );
5684 }
5685
5686 let pns = space.for_path(path_id);
5687 let ranges = pns.pending_acks.ranges();
5688 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5689 let ecn = if receiving_ecn {
5690 Some(&pns.ecn_counters)
5691 } else {
5692 None
5693 };
5694 if let Some(max) = ranges.max() {
5695 sent.largest_acked.insert(path_id, max);
5696 }
5697
5698 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5699 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5701 let delay = delay_micros >> ack_delay_exp.into_inner();
5702
5703 if is_multipath_negotiated && space_id == SpaceId::Data {
5704 if !ranges.is_empty() {
5705 trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5706 frame::PathAck::encode(path_id, delay as _, ranges, ecn, buf);
5707 qlog.frame_path_ack(path_id, delay as _, ranges, ecn);
5708 stats.frame_tx.path_acks += 1;
5709 }
5710 } else {
5711 trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5712 frame::Ack::encode(delay as _, ranges, ecn, buf);
5713 stats.frame_tx.acks += 1;
5714 qlog.frame_ack(delay, ranges, ecn);
5715 }
5716 }
5717
5718 fn close_common(&mut self) {
5719 trace!("connection closed");
5720 self.timers.reset();
5721 }
5722
5723 fn set_close_timer(&mut self, now: Instant) {
5724 let pto_max = self.pto_max_path(self.highest_space, true);
5727 self.timers.set(
5728 Timer::Conn(ConnTimer::Close),
5729 now + 3 * pto_max,
5730 self.qlog.with_time(now),
5731 );
5732 }
5733
5734 fn handle_peer_params(
5739 &mut self,
5740 params: TransportParameters,
5741 loc_cid: ConnectionId,
5742 rem_cid: ConnectionId,
5743 now: Instant,
5744 ) -> Result<(), TransportError> {
5745 if Some(self.orig_rem_cid) != params.initial_src_cid
5746 || (self.side.is_client()
5747 && (Some(self.initial_dst_cid) != params.original_dst_cid
5748 || self.retry_src_cid != params.retry_src_cid))
5749 {
5750 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5751 "CID authentication failure",
5752 ));
5753 }
5754 if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5755 return Err(TransportError::PROTOCOL_VIOLATION(
5756 "multipath must not use zero-length CIDs",
5757 ));
5758 }
5759
5760 self.set_peer_params(params);
5761 self.qlog.emit_peer_transport_params_received(self, now);
5762
5763 Ok(())
5764 }
5765
5766 fn set_peer_params(&mut self, params: TransportParameters) {
5767 self.streams.set_params(¶ms);
5768 self.idle_timeout =
5769 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5770 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5771
5772 if let Some(ref info) = params.preferred_address {
5773 self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5775 path_id: None,
5776 sequence: 1,
5777 id: info.connection_id,
5778 reset_token: info.stateless_reset_token,
5779 retire_prior_to: 0,
5780 })
5781 .expect(
5782 "preferred address CID is the first received, and hence is guaranteed to be legal",
5783 );
5784 let remote = self.path_data(PathId::ZERO).remote;
5785 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5786 }
5787 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
5788
5789 let mut multipath_enabled = None;
5790 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5791 self.config.get_initial_max_path_id(),
5792 params.initial_max_path_id,
5793 ) {
5794 self.local_max_path_id = local_max_path_id;
5796 self.remote_max_path_id = remote_max_path_id;
5797 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5798 debug!(%initial_max_path_id, "multipath negotiated");
5799 multipath_enabled = Some(initial_max_path_id);
5800 }
5801
5802 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5803 self.config
5804 .max_remote_nat_traversal_addresses
5805 .zip(params.max_remote_nat_traversal_addresses)
5806 {
5807 if let Some(max_initial_paths) =
5808 multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5809 {
5810 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5811 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5812 self.iroh_hp =
5813 iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5814 debug!(
5815 %max_remote_addresses, %max_local_addresses,
5816 "iroh hole punching negotiated"
5817 );
5818
5819 match self.side() {
5820 Side::Client => {
5821 if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5822 warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5825 } else if max_local_addresses as u64
5826 > params.active_connection_id_limit.into_inner()
5827 {
5828 warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5832 }
5833 }
5834 Side::Server => {
5835 if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5836 warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5837 }
5838 }
5839 }
5840 } else {
5841 debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5842 }
5843 }
5844
5845 self.peer_params = params;
5846 let peer_max_udp_payload_size =
5847 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5848 self.path_data_mut(PathId::ZERO)
5849 .mtud
5850 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5851 }
5852
5853 fn decrypt_packet(
5855 &mut self,
5856 now: Instant,
5857 path_id: PathId,
5858 packet: &mut Packet,
5859 ) -> Result<Option<u64>, Option<TransportError>> {
5860 let result = packet_crypto::decrypt_packet_body(
5861 packet,
5862 path_id,
5863 &self.spaces,
5864 self.zero_rtt_crypto.as_ref(),
5865 self.key_phase,
5866 self.prev_crypto.as_ref(),
5867 self.next_crypto.as_ref(),
5868 )?;
5869
5870 let result = match result {
5871 Some(r) => r,
5872 None => return Ok(None),
5873 };
5874
5875 if result.outgoing_key_update_acked {
5876 if let Some(prev) = self.prev_crypto.as_mut() {
5877 prev.end_packet = Some((result.number, now));
5878 self.set_key_discard_timer(now, packet.header.space());
5879 }
5880 }
5881
5882 if result.incoming_key_update {
5883 trace!("key update authenticated");
5884 self.update_keys(Some((result.number, now)), true);
5885 self.set_key_discard_timer(now, packet.header.space());
5886 }
5887
5888 Ok(Some(result.number))
5889 }
5890
5891 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5892 trace!("executing key update");
5893 let new = self
5897 .crypto
5898 .next_1rtt_keys()
5899 .expect("only called for `Data` packets");
5900 self.key_phase_size = new
5901 .local
5902 .confidentiality_limit()
5903 .saturating_sub(KEY_UPDATE_MARGIN);
5904 let old = mem::replace(
5905 &mut self.spaces[SpaceId::Data]
5906 .crypto
5907 .as_mut()
5908 .unwrap() .packet,
5910 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5911 );
5912 self.spaces[SpaceId::Data]
5913 .iter_paths_mut()
5914 .for_each(|s| s.sent_with_keys = 0);
5915 self.prev_crypto = Some(PrevCrypto {
5916 crypto: old,
5917 end_packet,
5918 update_unacked: remote,
5919 });
5920 self.key_phase = !self.key_phase;
5921 }
5922
5923 fn peer_supports_ack_frequency(&self) -> bool {
5924 self.peer_params.min_ack_delay.is_some()
5925 }
5926
5927 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5932 debug_assert_eq!(
5933 self.highest_space,
5934 SpaceId::Data,
5935 "immediate ack must be written in the data space"
5936 );
5937 self.spaces[self.highest_space]
5938 .for_path(path_id)
5939 .immediate_ack_pending = true;
5940 }
5941
5942 #[cfg(test)]
5944 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5945 let (path_id, first_decode, remaining) = match &event.0 {
5946 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5947 path_id,
5948 first_decode,
5949 remaining,
5950 ..
5951 }) => (path_id, first_decode, remaining),
5952 _ => return None,
5953 };
5954
5955 if remaining.is_some() {
5956 panic!("Packets should never be coalesced in tests");
5957 }
5958
5959 let decrypted_header = packet_crypto::unprotect_header(
5960 first_decode.clone(),
5961 &self.spaces,
5962 self.zero_rtt_crypto.as_ref(),
5963 self.peer_params.stateless_reset_token,
5964 )?;
5965
5966 let mut packet = decrypted_header.packet?;
5967 packet_crypto::decrypt_packet_body(
5968 &mut packet,
5969 *path_id,
5970 &self.spaces,
5971 self.zero_rtt_crypto.as_ref(),
5972 self.key_phase,
5973 self.prev_crypto.as_ref(),
5974 self.next_crypto.as_ref(),
5975 )
5976 .ok()?;
5977
5978 Some(packet.payload.to_vec())
5979 }
5980
5981 #[cfg(test)]
5984 pub(crate) fn bytes_in_flight(&self) -> u64 {
5985 self.path_data(PathId::ZERO).in_flight.bytes
5987 }
5988
5989 #[cfg(test)]
5991 pub(crate) fn congestion_window(&self) -> u64 {
5992 let path = self.path_data(PathId::ZERO);
5993 path.congestion
5994 .window()
5995 .saturating_sub(path.in_flight.bytes)
5996 }
5997
5998 #[cfg(test)]
6000 pub(crate) fn is_idle(&self) -> bool {
6001 let current_timers = self.timers.values();
6002 current_timers
6003 .into_iter()
6004 .filter(|(timer, _)| {
6005 !matches!(
6006 timer,
6007 Timer::Conn(ConnTimer::KeepAlive)
6008 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6009 | Timer::Conn(ConnTimer::PushNewCid)
6010 | Timer::Conn(ConnTimer::KeyDiscard)
6011 )
6012 })
6013 .min_by_key(|(_, time)| *time)
6014 .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6015 }
6016
6017 #[cfg(test)]
6019 pub(crate) fn using_ecn(&self) -> bool {
6020 self.path_data(PathId::ZERO).sending_ecn
6021 }
6022
6023 #[cfg(test)]
6025 pub(crate) fn total_recvd(&self) -> u64 {
6026 self.path_data(PathId::ZERO).total_recvd
6027 }
6028
6029 #[cfg(test)]
6030 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6031 self.local_cid_state
6032 .get(&PathId::ZERO)
6033 .unwrap()
6034 .active_seq()
6035 }
6036
6037 #[cfg(test)]
6038 #[track_caller]
6039 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6040 self.local_cid_state
6041 .get(&PathId(path_id))
6042 .unwrap()
6043 .active_seq()
6044 }
6045
6046 #[cfg(test)]
6049 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6050 let n = self
6051 .local_cid_state
6052 .get_mut(&PathId::ZERO)
6053 .unwrap()
6054 .assign_retire_seq(v);
6055 self.endpoint_events
6056 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6057 }
6058
6059 #[cfg(test)]
6061 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
6062 self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
6063 }
6064
6065 #[cfg(test)]
6067 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6068 self.path_data(path_id).current_mtu()
6069 }
6070
6071 #[cfg(test)]
6073 pub(crate) fn trigger_path_validation(&mut self) {
6074 for path in self.paths.values_mut() {
6075 path.data.send_new_challenge = true;
6076 }
6077 }
6078
6079 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6090 let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
6091 path.data.send_new_challenge
6092 || path
6093 .prev
6094 .as_ref()
6095 .is_some_and(|(_, path)| path.send_new_challenge)
6096 || !path.data.path_responses.is_empty()
6097 });
6098 let other = self.streams.can_send_stream_data()
6099 || self
6100 .datagrams
6101 .outgoing
6102 .front()
6103 .is_some_and(|x| x.size(true) <= max_size);
6104 SendableFrames {
6105 acks: false,
6106 other,
6107 close: false,
6108 path_exclusive,
6109 }
6110 }
6111
6112 fn kill(&mut self, reason: ConnectionError) {
6114 self.close_common();
6115 self.state.move_to_drained(Some(reason));
6116 self.endpoint_events.push_back(EndpointEventInner::Drained);
6117 }
6118
6119 pub fn current_mtu(&self) -> u16 {
6126 self.paths
6127 .iter()
6128 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6129 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6130 .min()
6131 .expect("There is always at least one available path")
6132 }
6133
6134 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6141 let pn_len = PacketNumber::new(
6142 pn,
6143 self.spaces[SpaceId::Data]
6144 .for_path(path)
6145 .largest_acked_packet
6146 .unwrap_or(0),
6147 )
6148 .len();
6149
6150 1 + self
6152 .rem_cids
6153 .get(&path)
6154 .map(|cids| cids.active().len())
6155 .unwrap_or(20) + pn_len
6157 + self.tag_len_1rtt()
6158 }
6159
6160 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6161 let pn_len = 4;
6162
6163 let cid_len = self
6164 .rem_cids
6165 .values()
6166 .map(|cids| cids.active().len())
6167 .max()
6168 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
6172 }
6173
6174 fn tag_len_1rtt(&self) -> usize {
6175 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6176 Some(crypto) => Some(&*crypto.packet.local),
6177 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6178 };
6179 key.map_or(16, |x| x.tag_len())
6183 }
6184
6185 fn on_path_validated(&mut self, path_id: PathId) {
6187 self.path_data_mut(path_id).validated = true;
6188 let ConnectionSide::Server { server_config } = &self.side else {
6189 return;
6190 };
6191 let remote_addr = self.path_data(path_id).remote;
6192 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6193 new_tokens.clear();
6194 for _ in 0..server_config.validation_token.sent {
6195 new_tokens.push(remote_addr);
6196 }
6197 }
6198
6199 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6201 if let Some(path) = self.paths.get_mut(&path_id) {
6202 path.data.status.remote_update(status, status_seq_no);
6203 } else {
6204 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6205 }
6206 self.events.push_back(
6207 PathEvent::RemoteStatus {
6208 id: path_id,
6209 status,
6210 }
6211 .into(),
6212 );
6213 }
6214
6215 fn max_path_id(&self) -> Option<PathId> {
6224 if self.is_multipath_negotiated() {
6225 Some(self.remote_max_path_id.min(self.local_max_path_id))
6226 } else {
6227 None
6228 }
6229 }
6230
6231 pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6233 if let Some(added) = self.iroh_hp.add_local_address(address)? {
6234 self.spaces[SpaceId::Data].pending.add_address.insert(added);
6235 };
6236 Ok(())
6237 }
6238
6239 pub fn remove_nat_traversal_address(
6243 &mut self,
6244 address: SocketAddr,
6245 ) -> Result<(), iroh_hp::Error> {
6246 if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6247 self.spaces[SpaceId::Data]
6248 .pending
6249 .remove_address
6250 .insert(removed);
6251 }
6252 Ok(())
6253 }
6254
6255 pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6257 self.iroh_hp.get_local_nat_traversal_addresses()
6258 }
6259
6260 pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6262 Ok(self
6263 .iroh_hp
6264 .client_side()?
6265 .get_remote_nat_traversal_addresses())
6266 }
6267
6268 pub fn initiate_nat_traversal_round<F>(
6276 &mut self,
6277 now: Instant,
6278 rtt_hints: F,
6279 ) -> Result<Vec<SocketAddr>, iroh_hp::Error>
6280 where
6281 F: Fn(&SocketAddr) -> Option<Duration>,
6282 {
6283 if self.state.is_closed() {
6284 return Err(iroh_hp::Error::Closed);
6285 }
6286
6287 let client_state = self.iroh_hp.client_side_mut()?;
6288 let iroh_hp::NatTraversalRound {
6289 new_round,
6290 reach_out_at,
6291 addresses_to_probe,
6292 prev_round_path_ids,
6293 } = client_state.initiate_nat_traversal_round()?;
6294
6295 self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6296
6297 for path_id in prev_round_path_ids {
6298 let validated = self
6301 .path(path_id)
6302 .map(|path| path.validated)
6303 .unwrap_or(false);
6304
6305 if !validated {
6306 let _ = self.close_path(
6307 now,
6308 path_id,
6309 TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6310 );
6311 }
6312 }
6313
6314 let mut err = None;
6315
6316 let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6317 let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6318 let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6319
6320 for (ip, port) in addresses_to_probe {
6321 let remote = match ip {
6323 IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6324 IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6325 IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6326 IpAddr::V6(_) => {
6327 trace!("not using IPv6 nat candidate for IPv4 socket");
6328 continue;
6329 }
6330 };
6331 let rtt_hint = rtt_hints(&remote);
6332 match self.open_path_ensure(remote, PathStatus::Backup, now, rtt_hint) {
6333 Ok((path_id, path_was_known)) if !path_was_known => {
6334 path_ids.push(path_id);
6335 probed_addresses.push(remote);
6336 }
6337 Ok((path_id, _)) => {
6338 trace!(%path_id, %remote,"nat traversal: path existed for remote")
6339 }
6340 Err(e) => {
6341 debug!(%remote, %e,"nat traversal: failed to probe remote");
6342 err.get_or_insert(e);
6343 }
6344 }
6345 }
6346
6347 if let Some(err) = err {
6348 if probed_addresses.is_empty() {
6350 return Err(iroh_hp::Error::Multipath(err));
6351 }
6352 }
6353
6354 self.iroh_hp
6355 .client_side_mut()
6356 .expect("connection side validated")
6357 .set_round_path_ids(path_ids);
6358
6359 Ok(probed_addresses)
6360 }
6361}
6362
6363impl fmt::Debug for Connection {
6364 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6365 f.debug_struct("Connection")
6366 .field("handshake_cid", &self.handshake_cid)
6367 .finish()
6368 }
6369}
6370
6371#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6372enum PathBlocked {
6373 No,
6374 AntiAmplification,
6375 Congestion,
6376 Pacing,
6377}
6378
6379enum ConnectionSide {
6381 Client {
6382 token: Bytes,
6384 token_store: Arc<dyn TokenStore>,
6385 server_name: String,
6386 },
6387 Server {
6388 server_config: Arc<ServerConfig>,
6389 },
6390}
6391
6392impl ConnectionSide {
6393 fn remote_may_migrate(&self, state: &State) -> bool {
6394 match self {
6395 Self::Server { server_config } => server_config.migration,
6396 Self::Client { .. } => {
6397 if let Some(hs) = state.as_handshake() {
6398 hs.allow_server_migration
6399 } else {
6400 false
6401 }
6402 }
6403 }
6404 }
6405
6406 fn is_client(&self) -> bool {
6407 self.side().is_client()
6408 }
6409
6410 fn is_server(&self) -> bool {
6411 self.side().is_server()
6412 }
6413
6414 fn side(&self) -> Side {
6415 match *self {
6416 Self::Client { .. } => Side::Client,
6417 Self::Server { .. } => Side::Server,
6418 }
6419 }
6420}
6421
6422impl From<SideArgs> for ConnectionSide {
6423 fn from(side: SideArgs) -> Self {
6424 match side {
6425 SideArgs::Client {
6426 token_store,
6427 server_name,
6428 } => Self::Client {
6429 token: token_store.take(&server_name).unwrap_or_default(),
6430 token_store,
6431 server_name,
6432 },
6433 SideArgs::Server {
6434 server_config,
6435 pref_addr_cid: _,
6436 path_validated: _,
6437 } => Self::Server { server_config },
6438 }
6439 }
6440}
6441
6442pub(crate) enum SideArgs {
6444 Client {
6445 token_store: Arc<dyn TokenStore>,
6446 server_name: String,
6447 },
6448 Server {
6449 server_config: Arc<ServerConfig>,
6450 pref_addr_cid: Option<ConnectionId>,
6451 path_validated: bool,
6452 },
6453}
6454
6455impl SideArgs {
6456 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6457 match *self {
6458 Self::Client { .. } => None,
6459 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6460 }
6461 }
6462
6463 pub(crate) fn path_validated(&self) -> bool {
6464 match *self {
6465 Self::Client { .. } => true,
6466 Self::Server { path_validated, .. } => path_validated,
6467 }
6468 }
6469
6470 pub(crate) fn side(&self) -> Side {
6471 match *self {
6472 Self::Client { .. } => Side::Client,
6473 Self::Server { .. } => Side::Server,
6474 }
6475 }
6476}
6477
6478#[derive(Debug, Error, Clone, PartialEq, Eq)]
6480pub enum ConnectionError {
6481 #[error("peer doesn't implement any supported version")]
6483 VersionMismatch,
6484 #[error(transparent)]
6486 TransportError(#[from] TransportError),
6487 #[error("aborted by peer: {0}")]
6489 ConnectionClosed(frame::ConnectionClose),
6490 #[error("closed by peer: {0}")]
6492 ApplicationClosed(frame::ApplicationClose),
6493 #[error("reset by peer")]
6495 Reset,
6496 #[error("timed out")]
6502 TimedOut,
6503 #[error("closed")]
6505 LocallyClosed,
6506 #[error("CIDs exhausted")]
6510 CidsExhausted,
6511}
6512
6513impl From<Close> for ConnectionError {
6514 fn from(x: Close) -> Self {
6515 match x {
6516 Close::Connection(reason) => Self::ConnectionClosed(reason),
6517 Close::Application(reason) => Self::ApplicationClosed(reason),
6518 }
6519 }
6520}
6521
6522impl From<ConnectionError> for io::Error {
6524 fn from(x: ConnectionError) -> Self {
6525 use ConnectionError::*;
6526 let kind = match x {
6527 TimedOut => io::ErrorKind::TimedOut,
6528 Reset => io::ErrorKind::ConnectionReset,
6529 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6530 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6531 io::ErrorKind::Other
6532 }
6533 };
6534 Self::new(kind, x)
6535 }
6536}
6537
6538#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6541pub enum PathError {
6542 #[error("multipath extension not negotiated")]
6544 MultipathNotNegotiated,
6545 #[error("the server side may not open a path")]
6547 ServerSideNotAllowed,
6548 #[error("maximum number of concurrent paths reached")]
6550 MaxPathIdReached,
6551 #[error("remoted CIDs exhausted")]
6553 RemoteCidsExhausted,
6554 #[error("path validation failed")]
6556 ValidationFailed,
6557 #[error("invalid remote address")]
6559 InvalidRemoteAddress(SocketAddr),
6560}
6561
6562#[derive(Debug, Error, Clone, Eq, PartialEq)]
6564pub enum ClosePathError {
6565 #[error("closed path")]
6567 ClosedPath,
6568 #[error("last open path")]
6570 LastOpenPath,
6571}
6572
6573#[derive(Debug, Error, Clone, Copy)]
6574#[error("Multipath extension not negotiated")]
6575pub struct MultipathNotNegotiated {
6576 _private: (),
6577}
6578
6579#[derive(Debug)]
6581pub enum Event {
6582 HandshakeDataReady,
6584 Connected,
6586 HandshakeConfirmed,
6588 ConnectionLost {
6592 reason: ConnectionError,
6594 },
6595 Stream(StreamEvent),
6597 DatagramReceived,
6599 DatagramsUnblocked,
6601 Path(PathEvent),
6603 NatTraversal(iroh_hp::Event),
6605}
6606
6607impl From<PathEvent> for Event {
6608 fn from(source: PathEvent) -> Self {
6609 Self::Path(source)
6610 }
6611}
6612
6613fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6614 Duration::from_micros(params.max_ack_delay.0 * 1000)
6615}
6616
6617const MAX_BACKOFF_EXPONENT: u32 = 16;
6619
6620const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6628
6629const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6635 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6636
6637const KEY_UPDATE_MARGIN: u64 = 10_000;
6641
6642#[derive(Default)]
6643struct SentFrames {
6644 retransmits: ThinRetransmits,
6645 largest_acked: FxHashMap<PathId, u64>,
6647 stream_frames: StreamMetaVec,
6648 non_retransmits: bool,
6650 requires_padding: bool,
6652}
6653
6654impl SentFrames {
6655 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6657 !self.largest_acked.is_empty()
6658 && !self.non_retransmits
6659 && self.stream_frames.is_empty()
6660 && self.retransmits.is_empty(streams)
6661 }
6662}
6663
6664fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6672 match (x, y) {
6673 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6674 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6675 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6676 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6677 }
6678}
6679
6680#[cfg(test)]
6681mod tests {
6682 use super::*;
6683
6684 #[test]
6685 fn negotiate_max_idle_timeout_commutative() {
6686 let test_params = [
6687 (None, None, None),
6688 (None, Some(VarInt(0)), None),
6689 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6690 (Some(VarInt(0)), Some(VarInt(0)), None),
6691 (
6692 Some(VarInt(2)),
6693 Some(VarInt(0)),
6694 Some(Duration::from_millis(2)),
6695 ),
6696 (
6697 Some(VarInt(1)),
6698 Some(VarInt(4)),
6699 Some(Duration::from_millis(1)),
6700 ),
6701 ];
6702
6703 for (left, right, result) in test_params {
6704 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6705 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6706 }
6707 }
6708}