1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 num::NonZeroU32,
8 ops::Not,
9 sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
22 MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
23 TransportErrorCode, VarInt,
24 cid_generator::ConnectionIdGenerator,
25 cid_queue::CidQueue,
26 coding::BufMutExt,
27 config::{ServerConfig, TransportConfig},
28 congestion::Controller,
29 connection::{
30 qlog::{QlogRecvPacket, QlogSentPacket, QlogSink},
31 spaces::LostPacket,
32 timer::{ConnTimer, PathTimer},
33 },
34 crypto::{self, KeyPair, Keys, PacketKey},
35 frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
36 iroh_hp,
37 packet::{
38 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
39 PacketNumber, PartialDecode, SpaceId,
40 },
41 range_set::ArrayRangeSet,
42 shared::{
43 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
44 EndpointEvent, EndpointEventInner,
45 },
46 token::{ResetToken, Token, TokenPayload},
47 transport_parameters::TransportParameters,
48};
49
50mod ack_frequency;
51use ack_frequency::AckFrequencyState;
52
53mod assembler;
54pub use assembler::Chunk;
55
56mod cid_state;
57use cid_state::CidState;
58
59mod datagrams;
60use datagrams::DatagramState;
61pub use datagrams::{Datagrams, SendDatagramError};
62
63mod mtud;
64mod pacing;
65
66mod packet_builder;
67use packet_builder::{PacketBuilder, PadDatagram};
68
69mod packet_crypto;
70use packet_crypto::{PrevCrypto, ZeroRttCrypto};
71
72mod paths;
73pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
74use paths::{PathData, PathState};
75
76pub(crate) mod qlog;
77
78mod send_buffer;
79
80mod spaces;
81#[cfg(fuzzing)]
82pub use spaces::Retransmits;
83#[cfg(not(fuzzing))]
84use spaces::Retransmits;
85use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
86
87mod stats;
88pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
89
90mod streams;
91#[cfg(fuzzing)]
92pub use streams::StreamsState;
93#[cfg(not(fuzzing))]
94use streams::StreamsState;
95pub use streams::{
96 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
97 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
98};
99
100mod timer;
101use timer::{Timer, TimerTable};
102
103mod transmit_buf;
104use transmit_buf::TransmitBuf;
105
106mod state;
107
108#[cfg(not(fuzzing))]
109use state::State;
110#[cfg(fuzzing)]
111pub use state::State;
112use state::StateType;
113
114pub struct Connection {
154 endpoint_config: Arc<EndpointConfig>,
155 config: Arc<TransportConfig>,
156 rng: StdRng,
157 crypto: Box<dyn crypto::Session>,
158 handshake_cid: ConnectionId,
160 rem_handshake_cid: ConnectionId,
162 local_ip: Option<IpAddr>,
165 paths: BTreeMap<PathId, PathState>,
171 path_counter: u64,
175 allow_mtud: bool,
177 state: State,
178 side: ConnectionSide,
179 zero_rtt_enabled: bool,
181 zero_rtt_crypto: Option<ZeroRttCrypto>,
183 key_phase: bool,
184 key_phase_size: u64,
186 peer_params: TransportParameters,
188 orig_rem_cid: ConnectionId,
190 initial_dst_cid: ConnectionId,
192 retry_src_cid: Option<ConnectionId>,
195 events: VecDeque<Event>,
197 endpoint_events: VecDeque<EndpointEventInner>,
198 spin_enabled: bool,
200 spin: bool,
202 spaces: [PacketSpace; 3],
204 highest_space: SpaceId,
206 prev_crypto: Option<PrevCrypto>,
208 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
213 accepted_0rtt: bool,
214 permit_idle_reset: bool,
216 idle_timeout: Option<Duration>,
218 timers: TimerTable,
219 authentication_failures: u64,
221
222 close: bool,
227
228 ack_frequency: AckFrequencyState,
232
233 receiving_ecn: bool,
238 total_authed_packets: u64,
240 app_limited: bool,
243
244 next_observed_addr_seq_no: VarInt,
249
250 streams: StreamsState,
251 rem_cids: FxHashMap<PathId, CidQueue>,
257 local_cid_state: FxHashMap<PathId, CidState>,
264 datagrams: DatagramState,
266 stats: ConnectionStats,
268 path_stats: FxHashMap<PathId, PathStats>,
270 version: u32,
272
273 max_concurrent_paths: NonZeroU32,
282 local_max_path_id: PathId,
297 remote_max_path_id: PathId,
303 max_path_id_with_cids: PathId,
309 abandoned_paths: FxHashSet<PathId>,
317
318 iroh_hp: iroh_hp::State,
319 qlog: QlogSink,
320}
321
322impl Connection {
323 pub(crate) fn new(
324 endpoint_config: Arc<EndpointConfig>,
325 config: Arc<TransportConfig>,
326 init_cid: ConnectionId,
327 loc_cid: ConnectionId,
328 rem_cid: ConnectionId,
329 remote: SocketAddr,
330 local_ip: Option<IpAddr>,
331 crypto: Box<dyn crypto::Session>,
332 cid_gen: &dyn ConnectionIdGenerator,
333 now: Instant,
334 version: u32,
335 allow_mtud: bool,
336 rng_seed: [u8; 32],
337 side_args: SideArgs,
338 qlog: QlogSink,
339 ) -> Self {
340 let pref_addr_cid = side_args.pref_addr_cid();
341 let path_validated = side_args.path_validated();
342 let connection_side = ConnectionSide::from(side_args);
343 let side = connection_side.side();
344 let mut rng = StdRng::from_seed(rng_seed);
345 let initial_space = {
346 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
347 space.crypto = Some(crypto.initial_keys(init_cid, side));
348 space
349 };
350 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
351 #[cfg(test)]
352 let data_space = match config.deterministic_packet_numbers {
353 true => PacketSpace::new_deterministic(now, SpaceId::Data),
354 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
355 };
356 #[cfg(not(test))]
357 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
358 let state = State::handshake(state::Handshake {
359 rem_cid_set: side.is_server(),
360 expected_token: Bytes::new(),
361 client_hello: None,
362 allow_server_migration: side.is_client(),
363 });
364 let local_cid_state = FxHashMap::from_iter([(
365 PathId::ZERO,
366 CidState::new(
367 cid_gen.cid_len(),
368 cid_gen.cid_lifetime(),
369 now,
370 if pref_addr_cid.is_some() { 2 } else { 1 },
371 ),
372 )]);
373
374 let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config);
375 path.open = true;
377 let mut this = Self {
378 endpoint_config,
379 crypto,
380 handshake_cid: loc_cid,
381 rem_handshake_cid: rem_cid,
382 local_cid_state,
383 paths: BTreeMap::from_iter([(
384 PathId::ZERO,
385 PathState {
386 data: path,
387 prev: None,
388 },
389 )]),
390 path_counter: 0,
391 allow_mtud,
392 local_ip,
393 state,
394 side: connection_side,
395 zero_rtt_enabled: false,
396 zero_rtt_crypto: None,
397 key_phase: false,
398 key_phase_size: rng.random_range(10..1000),
405 peer_params: TransportParameters::default(),
406 orig_rem_cid: rem_cid,
407 initial_dst_cid: init_cid,
408 retry_src_cid: None,
409 events: VecDeque::new(),
410 endpoint_events: VecDeque::new(),
411 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
412 spin: false,
413 spaces: [initial_space, handshake_space, data_space],
414 highest_space: SpaceId::Initial,
415 prev_crypto: None,
416 next_crypto: None,
417 accepted_0rtt: false,
418 permit_idle_reset: true,
419 idle_timeout: match config.max_idle_timeout {
420 None | Some(VarInt(0)) => None,
421 Some(dur) => Some(Duration::from_millis(dur.0)),
422 },
423 timers: TimerTable::default(),
424 authentication_failures: 0,
425 close: false,
426
427 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
428 &TransportParameters::default(),
429 )),
430
431 app_limited: false,
432 receiving_ecn: false,
433 total_authed_packets: 0,
434
435 next_observed_addr_seq_no: 0u32.into(),
436
437 streams: StreamsState::new(
438 side,
439 config.max_concurrent_uni_streams,
440 config.max_concurrent_bidi_streams,
441 config.send_window,
442 config.receive_window,
443 config.stream_receive_window,
444 ),
445 datagrams: DatagramState::default(),
446 config,
447 rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
448 rng,
449 stats: ConnectionStats::default(),
450 path_stats: Default::default(),
451 version,
452
453 max_concurrent_paths: NonZeroU32::MIN,
455 local_max_path_id: PathId::ZERO,
456 remote_max_path_id: PathId::ZERO,
457 max_path_id_with_cids: PathId::ZERO,
458 abandoned_paths: Default::default(),
459
460 iroh_hp: Default::default(),
462 qlog,
463 };
464 if path_validated {
465 this.on_path_validated(PathId::ZERO);
466 }
467 if side.is_client() {
468 this.write_crypto();
470 this.init_0rtt(now);
471 }
472 this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now);
473 this
474 }
475
476 #[must_use]
484 pub fn poll_timeout(&mut self) -> Option<Instant> {
485 self.timers.peek()
486 }
487
488 #[must_use]
494 pub fn poll(&mut self) -> Option<Event> {
495 if let Some(x) = self.events.pop_front() {
496 return Some(x);
497 }
498
499 if let Some(event) = self.streams.poll() {
500 return Some(Event::Stream(event));
501 }
502
503 if let Some(reason) = self.state.take_error() {
504 return Some(Event::ConnectionLost { reason });
505 }
506
507 None
508 }
509
510 #[must_use]
512 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
513 self.endpoint_events.pop_front().map(EndpointEvent)
514 }
515
516 #[must_use]
518 pub fn streams(&mut self) -> Streams<'_> {
519 Streams {
520 state: &mut self.streams,
521 conn_state: &self.state,
522 }
523 }
524
525 #[must_use]
527 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
528 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
529 RecvStream {
530 id,
531 state: &mut self.streams,
532 pending: &mut self.spaces[SpaceId::Data].pending,
533 }
534 }
535
536 #[must_use]
538 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
539 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
540 SendStream {
541 id,
542 state: &mut self.streams,
543 pending: &mut self.spaces[SpaceId::Data].pending,
544 conn_state: &self.state,
545 }
546 }
547
548 pub fn open_path_ensure(
555 &mut self,
556 remote: SocketAddr,
557 initial_status: PathStatus,
558 now: Instant,
559 ) -> Result<(PathId, bool), PathError> {
560 match self
561 .paths
562 .iter()
563 .find(|(_id, path)| path.data.remote == remote)
564 {
565 Some((path_id, _state)) => Ok((*path_id, true)),
566 None => self
567 .open_path(remote, initial_status, now)
568 .map(|id| (id, false)),
569 }
570 }
571
572 pub fn open_path(
577 &mut self,
578 remote: SocketAddr,
579 initial_status: PathStatus,
580 now: Instant,
581 ) -> Result<PathId, PathError> {
582 if !self.is_multipath_negotiated() {
583 return Err(PathError::MultipathNotNegotiated);
584 }
585 if self.side().is_server() {
586 return Err(PathError::ServerSideNotAllowed);
587 }
588
589 let max_abandoned = self.abandoned_paths.iter().max().copied();
590 let max_used = self.paths.keys().last().copied();
591 let path_id = max_abandoned
592 .max(max_used)
593 .unwrap_or(PathId::ZERO)
594 .saturating_add(1u8);
595
596 if Some(path_id) > self.max_path_id() {
597 return Err(PathError::MaxPathIdReached);
598 }
599 if path_id > self.remote_max_path_id {
600 self.spaces[SpaceId::Data].pending.paths_blocked = true;
601 return Err(PathError::MaxPathIdReached);
602 }
603 if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
604 self.spaces[SpaceId::Data]
605 .pending
606 .path_cids_blocked
607 .push(path_id);
608 return Err(PathError::RemoteCidsExhausted);
609 }
610
611 let path = self.ensure_path(path_id, remote, now, None);
612 path.status.local_update(initial_status);
613
614 Ok(path_id)
615 }
616
617 pub fn close_path(
623 &mut self,
624 now: Instant,
625 path_id: PathId,
626 error_code: VarInt,
627 ) -> Result<(), ClosePathError> {
628 if self.abandoned_paths.contains(&path_id)
629 || Some(path_id) > self.max_path_id()
630 || !self.paths.contains_key(&path_id)
631 {
632 return Err(ClosePathError::ClosedPath);
633 }
634 if self
635 .paths
636 .iter()
637 .any(|(id, path)| {
639 *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
640 })
641 .not()
642 {
643 return Err(ClosePathError::LastOpenPath);
644 }
645
646 self.spaces[SpaceId::Data]
648 .pending
649 .path_abandon
650 .insert(path_id, error_code.into());
651
652 let pending_space = &mut self.spaces[SpaceId::Data].pending;
654 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
655 pending_space.path_cids_blocked.retain(|&id| id != path_id);
656 pending_space.path_status.retain(|&id| id != path_id);
657
658 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
660 for sent_packet in space.sent_packets.values_mut() {
661 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
662 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
663 retransmits.path_cids_blocked.retain(|&id| id != path_id);
664 retransmits.path_status.retain(|&id| id != path_id);
665 }
666 }
667 }
668
669 self.rem_cids.remove(&path_id);
675 self.endpoint_events
676 .push_back(EndpointEventInner::RetireResetToken(path_id));
677
678 let pto = self.pto_max_path(SpaceId::Data);
679
680 let path = self.paths.get_mut(&path_id).expect("checked above");
681
682 path.data.last_allowed_receive = Some(now + 3 * pto);
684 self.abandoned_paths.insert(path_id);
685
686 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
687
688 self.timers.set(
693 Timer::PerPath(path_id, PathTimer::DiscardPath),
694 now + 6 * pto,
695 self.qlog.with_time(now),
696 );
697 Ok(())
698 }
699
700 #[track_caller]
704 fn path_data(&self, path_id: PathId) -> &PathData {
705 if let Some(data) = self.paths.get(&path_id) {
706 &data.data
707 } else {
708 panic!(
709 "unknown path: {path_id}, currently known paths: {:?}",
710 self.paths.keys().collect::<Vec<_>>()
711 );
712 }
713 }
714
715 fn path(&self, path_id: PathId) -> Option<&PathData> {
717 self.paths.get(&path_id).map(|path_state| &path_state.data)
718 }
719
720 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
722 self.paths
723 .get_mut(&path_id)
724 .map(|path_state| &mut path_state.data)
725 }
726
727 pub fn paths(&self) -> Vec<PathId> {
731 self.paths.keys().copied().collect()
732 }
733
734 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
736 self.path(path_id)
737 .map(PathData::local_status)
738 .ok_or(ClosedPath { _private: () })
739 }
740
741 pub fn path_remote_address(&self, path_id: PathId) -> Result<SocketAddr, ClosedPath> {
743 self.path(path_id)
744 .map(|path| path.remote)
745 .ok_or(ClosedPath { _private: () })
746 }
747
748 pub fn set_path_status(
752 &mut self,
753 path_id: PathId,
754 status: PathStatus,
755 ) -> Result<PathStatus, SetPathStatusError> {
756 if !self.is_multipath_negotiated() {
757 return Err(SetPathStatusError::MultipathNotNegotiated);
758 }
759 let path = self
760 .path_mut(path_id)
761 .ok_or(SetPathStatusError::ClosedPath)?;
762 let prev = match path.status.local_update(status) {
763 Some(prev) => {
764 self.spaces[SpaceId::Data]
765 .pending
766 .path_status
767 .insert(path_id);
768 prev
769 }
770 None => path.local_status(),
771 };
772 Ok(prev)
773 }
774
775 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
780 self.path(path_id).and_then(|path| path.remote_status())
781 }
782
783 pub fn set_path_max_idle_timeout(
789 &mut self,
790 path_id: PathId,
791 timeout: Option<Duration>,
792 ) -> Result<Option<Duration>, ClosedPath> {
793 let path = self
794 .paths
795 .get_mut(&path_id)
796 .ok_or(ClosedPath { _private: () })?;
797 Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
798 }
799
800 pub fn set_path_keep_alive_interval(
806 &mut self,
807 path_id: PathId,
808 interval: Option<Duration>,
809 ) -> Result<Option<Duration>, ClosedPath> {
810 let path = self
811 .paths
812 .get_mut(&path_id)
813 .ok_or(ClosedPath { _private: () })?;
814 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
815 }
816
817 #[track_caller]
821 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
822 &mut self.paths.get_mut(&path_id).expect("known path").data
823 }
824
825 fn is_remote_validated(&self, remote: SocketAddr) -> bool {
827 self.paths
828 .values()
829 .any(|path_state| path_state.data.remote == remote && path_state.data.validated)
830 }
833
834 fn ensure_path(
835 &mut self,
836 path_id: PathId,
837 remote: SocketAddr,
838 now: Instant,
839 pn: Option<u64>,
840 ) -> &mut PathData {
841 let validated = self.is_remote_validated(remote);
842 let vacant_entry = match self.paths.entry(path_id) {
843 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
844 btree_map::Entry::Occupied(occupied_entry) => {
845 return &mut occupied_entry.into_mut().data;
846 }
847 };
848
849 debug!(%validated, %path_id, ?remote, "path added");
850 let peer_max_udp_payload_size =
851 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
852 self.path_counter = self.path_counter.wrapping_add(1);
853 let mut data = PathData::new(
854 remote,
855 self.allow_mtud,
856 Some(peer_max_udp_payload_size),
857 self.path_counter,
858 now,
859 &self.config,
860 );
861
862 data.validated = validated;
863
864 let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
865 self.timers.set(
866 Timer::PerPath(path_id, PathTimer::PathOpen),
867 now + 3 * pto,
868 self.qlog.with_time(now),
869 );
870
871 data.send_new_challenge = true;
874
875 let path = vacant_entry.insert(PathState { data, prev: None });
876
877 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
878 if let Some(pn) = pn {
879 pn_space.dedup.insert(pn);
880 }
881 self.spaces[SpaceId::Data]
882 .number_spaces
883 .insert(path_id, pn_space);
884 self.qlog.emit_tuple_assigned(path_id, remote, now);
885 &mut path.data
886 }
887
888 #[must_use]
898 pub fn poll_transmit(
899 &mut self,
900 now: Instant,
901 max_datagrams: usize,
902 buf: &mut Vec<u8>,
903 ) -> Option<Transmit> {
904 assert!(max_datagrams != 0);
905 let max_datagrams = match self.config.enable_segmentation_offload {
906 false => 1,
907 true => max_datagrams,
908 };
909
910 let close = match self.state.as_type() {
929 StateType::Drained => {
930 self.app_limited = true;
931 return None;
932 }
933 StateType::Draining | StateType::Closed => {
934 if !self.close {
937 self.app_limited = true;
938 return None;
939 }
940 true
941 }
942 _ => false,
943 };
944
945 if let Some(config) = &self.config.ack_frequency_config {
947 let rtt = self
948 .paths
949 .values()
950 .map(|p| p.data.rtt.get())
951 .min()
952 .expect("one path exists");
953 self.spaces[SpaceId::Data].pending.ack_frequency = self
954 .ack_frequency
955 .should_send_ack_frequency(rtt, config, &self.peer_params)
956 && self.highest_space == SpaceId::Data
957 && self.peer_supports_ack_frequency();
958 }
959
960 let mut coalesce = true;
962
963 let mut pad_datagram = PadDatagram::No;
966
967 let mut congestion_blocked = false;
971
972 let mut last_packet_number = None;
974
975 let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
976
977 let have_available_path = self.paths.iter().any(|(id, path)| {
980 path.data.validated
981 && path.data.local_status() == PathStatus::Available
982 && self.rem_cids.contains_key(id)
983 });
984
985 let mut transmit = TransmitBuf::new(
987 buf,
988 max_datagrams,
989 self.path_data(path_id).current_mtu().into(),
990 );
991 if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
992 return Some(challenge);
993 }
994 let mut space_id = match path_id {
995 PathId::ZERO => SpaceId::Initial,
996 _ => SpaceId::Data,
997 };
998
999 loop {
1000 let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1002 let err = PathError::RemoteCidsExhausted;
1003 if !self.abandoned_paths.contains(&path_id) {
1004 debug!(?err, %path_id, "no active CID for path");
1005 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1006 id: path_id,
1007 error: err,
1008 }));
1009 self.close_path(
1013 now,
1014 path_id,
1015 TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1016 )
1017 .ok();
1018 self.spaces[SpaceId::Data]
1019 .pending
1020 .path_cids_blocked
1021 .push(path_id);
1022 } else {
1023 trace!(%path_id, "remote CIDs retired for abandoned path");
1024 }
1025
1026 match self.paths.keys().find(|&&next| next > path_id) {
1027 Some(next_path_id) => {
1028 path_id = *next_path_id;
1030 space_id = SpaceId::Data;
1031
1032 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1034 if let Some(challenge) =
1035 self.send_prev_path_challenge(now, &mut transmit, path_id)
1036 {
1037 return Some(challenge);
1038 }
1039
1040 continue;
1041 }
1042 None => {
1043 trace!(
1045 ?space_id,
1046 %path_id,
1047 "no CIDs to send on path, no more paths"
1048 );
1049 break;
1050 }
1051 }
1052 };
1053
1054 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1057 transmit.datagram_remaining_mut()
1059 } else {
1060 transmit.segment_size()
1062 };
1063 let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1064 let path_should_send = {
1065 let path_exclusive_only = space_id == SpaceId::Data
1066 && have_available_path
1067 && self.path_data(path_id).local_status() == PathStatus::Backup;
1068 let path_should_send = if path_exclusive_only {
1069 can_send.path_exclusive
1070 } else {
1071 !can_send.is_empty()
1072 };
1073 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1074 path_should_send || needs_loss_probe || can_send.close
1075 };
1076
1077 if !path_should_send && space_id < SpaceId::Data {
1078 if self.spaces[space_id].crypto.is_some() {
1079 trace!(?space_id, %path_id, "nothing to send in space");
1080 }
1081 space_id = space_id.next();
1082 continue;
1083 }
1084
1085 let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1086 self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1088 } else {
1089 PathBlocked::No
1090 };
1091 if send_blocked != PathBlocked::No {
1092 trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1093 congestion_blocked = true;
1094 }
1095 if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1096 space_id = space_id.next();
1099 continue;
1100 }
1101 if !path_should_send || send_blocked != PathBlocked::No {
1102 if transmit.num_datagrams() > 0 {
1107 break;
1108 }
1109
1110 if transmit.is_empty() {
1111 if let Some(probing) = self
1114 .iroh_hp
1115 .server_side_mut()
1116 .ok()
1117 .and_then(iroh_hp::ServerState::next_probe)
1118 {
1119 if let Some(new_cid) = self
1120 .rem_cids
1121 .get_mut(&path_id)
1122 .and_then(CidQueue::next_reserved)
1123 {
1124 let destination = probing.remote();
1125 let token: u64 = self.rng.random();
1126 let challenge = frame::PathChallenge(token);
1127 trace!(%destination, cid=%new_cid, %challenge, "Nat traversal: PATH_CHALLENGE packet");
1128 buf.write(challenge);
1129 QlogSentPacket::default().frame(&Frame::PathChallenge(challenge));
1130 self.stats.frame_tx.path_challenge += 1;
1131
1132 probing.finish(token);
1135 return Some(Transmit {
1136 destination,
1137 ecn: None,
1138 size: 8,
1139 segment_size: None,
1140 src_ip: None,
1141 });
1142 }
1143 }
1144 }
1145
1146 match self.paths.keys().find(|&&next| next > path_id) {
1147 Some(next_path_id) => {
1148 trace!(
1150 ?space_id,
1151 %path_id,
1152 %next_path_id,
1153 "nothing to send on path"
1154 );
1155 path_id = *next_path_id;
1156 space_id = SpaceId::Data;
1157
1158 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1160 if let Some(challenge) =
1161 self.send_prev_path_challenge(now, &mut transmit, path_id)
1162 {
1163 return Some(challenge);
1164 }
1165
1166 continue;
1167 }
1168 None => {
1169 trace!(
1171 ?space_id,
1172 %path_id,
1173 next_path_id=?None::<PathId>,
1174 "nothing to send on path"
1175 );
1176 break;
1177 }
1178 }
1179 }
1180
1181 if transmit.datagram_remaining_mut() == 0 {
1183 if transmit.num_datagrams() >= transmit.max_datagrams() {
1184 break;
1186 }
1187
1188 match self.spaces[space_id].for_path(path_id).loss_probes {
1189 0 => transmit.start_new_datagram(),
1190 _ => {
1191 let request_immediate_ack =
1193 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1194 self.spaces[space_id].maybe_queue_probe(
1195 path_id,
1196 request_immediate_ack,
1197 &self.streams,
1198 );
1199
1200 self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1201
1202 transmit.start_new_datagram_with_size(std::cmp::min(
1206 usize::from(INITIAL_MTU),
1207 transmit.segment_size(),
1208 ));
1209 }
1210 }
1211 trace!(count = transmit.num_datagrams(), "new datagram started");
1212 coalesce = true;
1213 pad_datagram = PadDatagram::No;
1214 }
1215
1216 if transmit.datagram_start_offset() < transmit.len() {
1219 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1220 }
1221
1222 if self.spaces[SpaceId::Initial].crypto.is_some()
1227 && space_id == SpaceId::Handshake
1228 && self.side.is_client()
1229 {
1230 self.discard_space(now, SpaceId::Initial);
1233 }
1234 if let Some(ref mut prev) = self.prev_crypto {
1235 prev.update_unacked = false;
1236 }
1237
1238 let mut qlog = QlogSentPacket::default();
1239 let mut builder = PacketBuilder::new(
1240 now,
1241 space_id,
1242 path_id,
1243 remote_cid,
1244 &mut transmit,
1245 can_send.other,
1246 self,
1247 &mut qlog,
1248 )?;
1249 last_packet_number = Some(builder.exact_number);
1250 coalesce = coalesce && !builder.short_header;
1251
1252 if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1253 pad_datagram |= PadDatagram::ToMinMtu;
1255 }
1256 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1257 pad_datagram |= PadDatagram::ToSegmentSize;
1258 }
1259
1260 if can_send.close {
1261 trace!("sending CONNECTION_CLOSE");
1262 let mut sent_frames = SentFrames::default();
1267 let is_multipath_negotiated = self.is_multipath_negotiated();
1268 for path_id in self.spaces[space_id]
1269 .number_spaces
1270 .iter()
1271 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1272 .map(|(&path_id, _)| path_id)
1273 .collect::<Vec<_>>()
1274 {
1275 Self::populate_acks(
1276 now,
1277 self.receiving_ecn,
1278 &mut sent_frames,
1279 path_id,
1280 space_id,
1281 &mut self.spaces[space_id],
1282 is_multipath_negotiated,
1283 &mut builder.frame_space_mut(),
1284 &mut self.stats,
1285 &mut qlog,
1286 );
1287 }
1288
1289 debug_assert!(
1293 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1294 "ACKs should leave space for ConnectionClose"
1295 );
1296 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1297 let max_frame_size = builder.frame_space_remaining();
1298 match self.state.as_type() {
1299 StateType::Closed => {
1300 let reason: Close =
1301 self.state.as_closed().expect("checked").clone().into();
1302 if space_id == SpaceId::Data || reason.is_transport_layer() {
1303 reason.encode(&mut builder.frame_space_mut(), max_frame_size);
1304 qlog.frame(&Frame::Close(reason));
1305 } else {
1306 let frame = frame::ConnectionClose {
1307 error_code: TransportErrorCode::APPLICATION_ERROR,
1308 frame_type: None,
1309 reason: Bytes::new(),
1310 };
1311 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1312 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1313 }
1314 }
1315 StateType::Draining => {
1316 let frame = frame::ConnectionClose {
1317 error_code: TransportErrorCode::NO_ERROR,
1318 frame_type: None,
1319 reason: Bytes::new(),
1320 };
1321 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1322 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1323 }
1324 _ => unreachable!(
1325 "tried to make a close packet when the connection wasn't closed"
1326 ),
1327 };
1328 }
1329 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1330 if space_id == self.highest_space {
1331 self.close = false;
1334 break;
1336 } else {
1337 space_id = space_id.next();
1341 continue;
1342 }
1343 }
1344
1345 if space_id == SpaceId::Data && builder.buf.is_empty() {
1348 let path = self.path_data_mut(path_id);
1349 if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) {
1350 let mut builder = if let Some(fresh_cid) = self
1351 .rem_cids
1352 .get_mut(&path_id)
1353 .and_then(CidQueue::next_reserved)
1354 {
1355 PacketBuilder::new(
1356 now,
1357 space_id,
1358 path_id,
1359 fresh_cid,
1360 &mut transmit,
1361 can_send.other,
1362 self,
1363 &mut qlog,
1364 )?
1365 } else {
1366 builder
1369 };
1370 let response = frame::PathResponse(token);
1371 trace!(%response, "(off-path)");
1372 builder.frame_space_mut().write(response);
1373 qlog.frame(&Frame::PathResponse(response));
1374 self.stats.frame_tx.path_response += 1;
1375 builder.finish_and_track(
1376 now,
1377 self,
1378 path_id,
1379 SentFrames {
1380 non_retransmits: true,
1381 ..SentFrames::default()
1382 },
1383 PadDatagram::ToMinMtu,
1384 qlog,
1385 );
1386 self.stats.udp_tx.on_sent(1, transmit.len());
1387 return Some(Transmit {
1388 destination: remote,
1389 size: transmit.len(),
1390 ecn: None,
1391 segment_size: None,
1392 src_ip: self.local_ip,
1393 });
1394 }
1395 }
1396
1397 let sent_frames = {
1398 let path_exclusive_only = have_available_path
1399 && self.path_data(path_id).local_status() == PathStatus::Backup;
1400 let pn = builder.exact_number;
1401 self.populate_packet(
1402 now,
1403 space_id,
1404 path_id,
1405 path_exclusive_only,
1406 &mut builder.frame_space_mut(),
1407 pn,
1408 &mut qlog,
1409 )
1410 };
1411
1412 debug_assert!(
1419 !(sent_frames.is_ack_only(&self.streams)
1420 && !can_send.acks
1421 && can_send.other
1422 && builder.buf.segment_size()
1423 == self.path_data(path_id).current_mtu() as usize
1424 && self.datagrams.outgoing.is_empty()),
1425 "SendableFrames was {can_send:?}, but only ACKs have been written"
1426 );
1427 if sent_frames.requires_padding {
1428 pad_datagram |= PadDatagram::ToMinMtu;
1429 }
1430
1431 for (path_id, _pn) in sent_frames.largest_acked.iter() {
1432 self.spaces[space_id]
1433 .for_path(*path_id)
1434 .pending_acks
1435 .acks_sent();
1436 self.timers.stop(
1437 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1438 self.qlog.with_time(now),
1439 );
1440 }
1441
1442 if coalesce
1450 && builder
1451 .buf
1452 .datagram_remaining_mut()
1453 .saturating_sub(builder.predict_packet_end())
1454 > MIN_PACKET_SPACE
1455 && self
1456 .next_send_space(space_id, path_id, builder.buf, close)
1457 .is_some()
1458 {
1459 builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog);
1462 } else {
1463 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1466 const MAX_PADDING: usize = 32;
1474 if builder.buf.datagram_remaining_mut()
1475 > builder.predict_packet_end() + MAX_PADDING
1476 {
1477 trace!(
1478 "GSO truncated by demand for {} padding bytes",
1479 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1480 );
1481 builder.finish_and_track(
1482 now,
1483 self,
1484 path_id,
1485 sent_frames,
1486 PadDatagram::No,
1487 qlog,
1488 );
1489 break;
1490 }
1491
1492 builder.finish_and_track(
1495 now,
1496 self,
1497 path_id,
1498 sent_frames,
1499 PadDatagram::ToSegmentSize,
1500 qlog,
1501 );
1502 } else {
1503 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1504 }
1505 if transmit.num_datagrams() == 1 {
1506 transmit.clip_datagram_size();
1507 }
1508 }
1509 }
1510
1511 if let Some(last_packet_number) = last_packet_number {
1512 self.path_data_mut(path_id).congestion.on_sent(
1515 now,
1516 transmit.len() as u64,
1517 last_packet_number,
1518 );
1519 }
1520
1521 self.qlog.emit_recovery_metrics(
1522 path_id,
1523 &mut self.paths.get_mut(&path_id).unwrap().data,
1524 now,
1525 );
1526
1527 self.app_limited = transmit.is_empty() && !congestion_blocked;
1528
1529 if transmit.is_empty() && self.state.is_established() {
1531 let space_id = SpaceId::Data;
1533 path_id = *self.paths.first_key_value().expect("one path must exist").0;
1534 let probe_data = loop {
1535 let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1541 let eligible = self.path_data(path_id).validated
1542 && !self.path_data(path_id).is_validating_path()
1543 && !self.abandoned_paths.contains(&path_id);
1544 let probe_size = eligible
1545 .then(|| {
1546 let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1547 self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1548 })
1549 .flatten();
1550 match (active_cid, probe_size) {
1551 (Some(active_cid), Some(probe_size)) => {
1552 break Some((active_cid, probe_size));
1554 }
1555 _ => {
1556 match self.paths.keys().find(|&&next| next > path_id) {
1558 Some(next) => {
1559 path_id = *next;
1560 continue;
1561 }
1562 None => break None,
1563 }
1564 }
1565 }
1566 };
1567 if let Some((active_cid, probe_size)) = probe_data {
1568 debug_assert_eq!(transmit.num_datagrams(), 0);
1570 transmit.start_new_datagram_with_size(probe_size as usize);
1571
1572 let mut qlog = QlogSentPacket::default();
1573 let mut builder = PacketBuilder::new(
1574 now,
1575 space_id,
1576 path_id,
1577 active_cid,
1578 &mut transmit,
1579 true,
1580 self,
1581 &mut qlog,
1582 )?;
1583
1584 trace!(?probe_size, "writing MTUD probe");
1586 trace!("PING");
1587 builder.frame_space_mut().write(frame::FrameType::PING);
1588 qlog.frame(&Frame::Ping);
1589 self.stats.frame_tx.ping += 1;
1590
1591 if self.peer_supports_ack_frequency() {
1593 trace!("IMMEDIATE_ACK");
1594 builder
1595 .frame_space_mut()
1596 .write(frame::FrameType::IMMEDIATE_ACK);
1597 self.stats.frame_tx.immediate_ack += 1;
1598 qlog.frame(&Frame::ImmediateAck);
1599 }
1600
1601 let sent_frames = SentFrames {
1602 non_retransmits: true,
1603 ..Default::default()
1604 };
1605 builder.finish_and_track(
1606 now,
1607 self,
1608 path_id,
1609 sent_frames,
1610 PadDatagram::ToSize(probe_size),
1611 qlog,
1612 );
1613
1614 self.path_stats
1615 .entry(path_id)
1616 .or_default()
1617 .sent_plpmtud_probes += 1;
1618 }
1619 }
1620
1621 if transmit.is_empty() {
1622 return None;
1623 }
1624
1625 let destination = self.path_data(path_id).remote;
1626 trace!(
1627 segment_size = transmit.segment_size(),
1628 last_datagram_len = transmit.len() % transmit.segment_size(),
1629 ?destination,
1630 "sending {} bytes in {} datagrams",
1631 transmit.len(),
1632 transmit.num_datagrams()
1633 );
1634 self.path_data_mut(path_id)
1635 .inc_total_sent(transmit.len() as u64);
1636
1637 self.stats
1638 .udp_tx
1639 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1640
1641 Some(Transmit {
1642 destination,
1643 size: transmit.len(),
1644 ecn: if self.path_data(path_id).sending_ecn {
1645 Some(EcnCodepoint::Ect0)
1646 } else {
1647 None
1648 },
1649 segment_size: match transmit.num_datagrams() {
1650 1 => None,
1651 _ => Some(transmit.segment_size()),
1652 },
1653 src_ip: self.local_ip,
1654 })
1655 }
1656
1657 fn next_send_space(
1662 &mut self,
1663 current_space_id: SpaceId,
1664 path_id: PathId,
1665 buf: &TransmitBuf<'_>,
1666 close: bool,
1667 ) -> Option<SpaceId> {
1668 let mut space_id = current_space_id;
1675 loop {
1676 let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1677 if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1678 return Some(space_id);
1679 }
1680 space_id = match space_id {
1681 SpaceId::Initial => SpaceId::Handshake,
1682 SpaceId::Handshake => SpaceId::Data,
1683 SpaceId::Data => break,
1684 }
1685 }
1686 None
1687 }
1688
1689 fn path_congestion_check(
1691 &mut self,
1692 space_id: SpaceId,
1693 path_id: PathId,
1694 transmit: &TransmitBuf<'_>,
1695 can_send: &SendableFrames,
1696 now: Instant,
1697 ) -> PathBlocked {
1698 if self.side().is_server()
1704 && self
1705 .path_data(path_id)
1706 .anti_amplification_blocked(transmit.len() as u64 + 1)
1707 {
1708 trace!(?space_id, %path_id, "blocked by anti-amplification");
1709 return PathBlocked::AntiAmplification;
1710 }
1711
1712 let bytes_to_send = transmit.segment_size() as u64;
1715 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1716
1717 if can_send.other && !need_loss_probe && !can_send.close {
1718 let path = self.path_data(path_id);
1719 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1720 trace!(?space_id, %path_id, "blocked by congestion control");
1721 return PathBlocked::Congestion;
1722 }
1723 }
1724
1725 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1727 self.timers.set(
1728 Timer::PerPath(path_id, PathTimer::Pacing),
1729 delay,
1730 self.qlog.with_time(now),
1731 );
1732 trace!(?space_id, %path_id, "blocked by pacing");
1735 return PathBlocked::Pacing;
1736 }
1737
1738 PathBlocked::No
1739 }
1740
1741 fn send_prev_path_challenge(
1746 &mut self,
1747 now: Instant,
1748 buf: &mut TransmitBuf<'_>,
1749 path_id: PathId,
1750 ) -> Option<Transmit> {
1751 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1752 if !prev_path.send_new_challenge {
1755 return None;
1756 };
1757 prev_path.send_new_challenge = false;
1758 let destination = prev_path.remote;
1759 let token = self.rng.random();
1760 let info = paths::SentChallengeInfo {
1761 sent_instant: now,
1762 remote: destination,
1763 };
1764 prev_path.challenges_sent.insert(token, info);
1765 debug_assert_eq!(
1766 self.highest_space,
1767 SpaceId::Data,
1768 "PATH_CHALLENGE queued without 1-RTT keys"
1769 );
1770 buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1771
1772 debug_assert_eq!(buf.datagram_start_offset(), 0);
1778 let mut qlog = QlogSentPacket::default();
1779 let mut builder = PacketBuilder::new(
1780 now,
1781 SpaceId::Data,
1782 path_id,
1783 *prev_cid,
1784 buf,
1785 false,
1786 self,
1787 &mut qlog,
1788 )?;
1789 let challenge = frame::PathChallenge(token);
1790 trace!(%challenge, "validating previous path");
1791 qlog.frame(&Frame::PathChallenge(challenge));
1792 builder.frame_space_mut().write(challenge);
1793 self.stats.frame_tx.path_challenge += 1;
1794
1795 builder.pad_to(MIN_INITIAL_SIZE);
1800
1801 builder.finish(self, now, qlog);
1802 self.stats.udp_tx.on_sent(1, buf.len());
1803
1804 Some(Transmit {
1805 destination,
1806 size: buf.len(),
1807 ecn: None,
1808 segment_size: None,
1809 src_ip: self.local_ip,
1810 })
1811 }
1812
1813 fn space_can_send(
1818 &mut self,
1819 space_id: SpaceId,
1820 path_id: PathId,
1821 packet_size: usize,
1822 close: bool,
1823 ) -> SendableFrames {
1824 let pn = self.spaces[SpaceId::Data]
1825 .for_path(path_id)
1826 .peek_tx_number();
1827 let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1828 if self.spaces[space_id].crypto.is_none()
1829 && (space_id != SpaceId::Data
1830 || self.zero_rtt_crypto.is_none()
1831 || self.side.is_server())
1832 {
1833 return SendableFrames::empty();
1835 }
1836 let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1837 if space_id == SpaceId::Data {
1838 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1839 }
1840
1841 can_send.close = close && self.spaces[space_id].crypto.is_some();
1842
1843 can_send
1844 }
1845
1846 pub fn handle_event(&mut self, event: ConnectionEvent) {
1852 use ConnectionEventInner::*;
1853 match event.0 {
1854 Datagram(DatagramConnectionEvent {
1855 now,
1856 remote,
1857 path_id,
1858 ecn,
1859 first_decode,
1860 remaining,
1861 }) => {
1862 let span = trace_span!("pkt", %path_id);
1863 let _guard = span.enter();
1864 if let Some(known_remote) = self.path(path_id).map(|path| path.remote) {
1868 if remote != known_remote && !self.side.remote_may_migrate(&self.state) {
1869 trace!(
1870 %path_id,
1871 ?remote,
1872 path_remote = ?self.path(path_id).map(|p| p.remote),
1873 "discarding packet from unrecognized peer"
1874 );
1875 return;
1876 }
1877 }
1878
1879 let was_anti_amplification_blocked = self
1880 .path(path_id)
1881 .map(|path| path.anti_amplification_blocked(1))
1882 .unwrap_or(true); self.stats.udp_rx.datagrams += 1;
1886 self.stats.udp_rx.bytes += first_decode.len() as u64;
1887 let data_len = first_decode.len();
1888
1889 self.handle_decode(now, remote, path_id, ecn, first_decode);
1890 if let Some(path) = self.path_mut(path_id) {
1895 path.inc_total_recvd(data_len as u64);
1896 }
1897
1898 if let Some(data) = remaining {
1899 self.stats.udp_rx.bytes += data.len() as u64;
1900 self.handle_coalesced(now, remote, path_id, ecn, data);
1901 }
1902
1903 if let Some(path) = self.paths.get_mut(&path_id) {
1904 self.qlog
1905 .emit_recovery_metrics(path_id, &mut path.data, now);
1906 }
1907
1908 if was_anti_amplification_blocked {
1909 self.set_loss_detection_timer(now, path_id);
1913 }
1914 }
1915 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1916 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1917 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1918 let cid_state = self
1919 .local_cid_state
1920 .entry(path_id)
1921 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1922 cid_state.new_cids(&ids, now);
1923
1924 ids.into_iter().rev().for_each(|frame| {
1925 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1926 });
1927 self.reset_cid_retirement(now);
1929 }
1930 }
1931 }
1932
1933 pub fn handle_timeout(&mut self, now: Instant) {
1943 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1944 trace!(?timer, at=?now, "timeout");
1946 match timer {
1947 Timer::Conn(timer) => match timer {
1948 ConnTimer::Close => {
1949 self.state.move_to_drained(None);
1950 self.endpoint_events.push_back(EndpointEventInner::Drained);
1951 }
1952 ConnTimer::Idle => {
1953 self.kill(ConnectionError::TimedOut);
1954 }
1955 ConnTimer::KeepAlive => {
1956 trace!("sending keep-alive");
1957 self.ping();
1958 }
1959 ConnTimer::KeyDiscard => {
1960 self.zero_rtt_crypto = None;
1961 self.prev_crypto = None;
1962 }
1963 ConnTimer::PushNewCid => {
1964 while let Some((path_id, when)) = self.next_cid_retirement() {
1965 if when > now {
1966 break;
1967 }
1968 match self.local_cid_state.get_mut(&path_id) {
1969 None => error!(%path_id, "No local CID state for path"),
1970 Some(cid_state) => {
1971 let num_new_cid = cid_state.on_cid_timeout().into();
1973 if !self.state.is_closed() {
1974 trace!(
1975 "push a new CID to peer RETIRE_PRIOR_TO field {}",
1976 cid_state.retire_prior_to()
1977 );
1978 self.endpoint_events.push_back(
1979 EndpointEventInner::NeedIdentifiers(
1980 path_id,
1981 now,
1982 num_new_cid,
1983 ),
1984 );
1985 }
1986 }
1987 }
1988 }
1989 }
1990 },
1991 Timer::PerPath(path_id, timer) => {
1993 let span = trace_span!("per-path timer fired", %path_id, ?timer);
1994 let _guard = span.enter();
1995 match timer {
1996 PathTimer::PathIdle => {
1997 self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1998 .ok();
1999 }
2000
2001 PathTimer::PathKeepAlive => {
2002 trace!("sending keep-alive on path");
2003 self.ping_path(path_id).ok();
2004 }
2005 PathTimer::LossDetection => {
2006 self.on_loss_detection_timeout(now, path_id);
2007 self.qlog.emit_recovery_metrics(
2008 path_id,
2009 &mut self.paths.get_mut(&path_id).unwrap().data,
2010 now,
2011 );
2012 }
2013 PathTimer::PathValidation => {
2014 let Some(path) = self.paths.get_mut(&path_id) else {
2015 continue;
2016 };
2017 self.timers.stop(
2018 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2019 self.qlog.with_time(now),
2020 );
2021 debug!("path validation failed");
2022 if let Some((_, prev)) = path.prev.take() {
2023 path.data = prev;
2024 }
2025 path.data.challenges_sent.clear();
2026 path.data.send_new_challenge = false;
2027 }
2028 PathTimer::PathChallengeLost => {
2029 let Some(path) = self.paths.get_mut(&path_id) else {
2030 continue;
2031 };
2032 trace!("path challenge deemed lost");
2033 path.data.send_new_challenge = true;
2034 }
2035 PathTimer::PathOpen => {
2036 let Some(path) = self.path_mut(path_id) else {
2037 continue;
2038 };
2039 path.challenges_sent.clear();
2040 path.send_new_challenge = false;
2041 debug!("new path validation failed");
2042 if let Err(err) = self.close_path(
2043 now,
2044 path_id,
2045 TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2046 ) {
2047 warn!(?err, "failed closing path");
2048 }
2049
2050 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2051 id: path_id,
2052 error: PathError::ValidationFailed,
2053 }));
2054 }
2055 PathTimer::Pacing => trace!("pacing timer expired"),
2056 PathTimer::MaxAckDelay => {
2057 trace!("max ack delay reached");
2058 self.spaces[SpaceId::Data]
2060 .for_path(path_id)
2061 .pending_acks
2062 .on_max_ack_delay_timeout()
2063 }
2064 PathTimer::DiscardPath => {
2065 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2068 if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2069 let (min_seq, max_seq) = loc_cid_state.active_seq();
2070 for seq in min_seq..=max_seq {
2071 self.endpoint_events.push_back(
2072 EndpointEventInner::RetireConnectionId(
2073 now, path_id, seq, false,
2074 ),
2075 );
2076 }
2077 }
2078 self.discard_path(path_id, now);
2079 }
2080 }
2081 }
2082 }
2083 }
2084 }
2085
2086 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2098 self.close_inner(
2099 now,
2100 Close::Application(frame::ApplicationClose { error_code, reason }),
2101 )
2102 }
2103
2104 fn close_inner(&mut self, now: Instant, reason: Close) {
2105 let was_closed = self.state.is_closed();
2106 if !was_closed {
2107 self.close_common();
2108 self.set_close_timer(now);
2109 self.close = true;
2110 self.state.move_to_closed_local(reason);
2111 }
2112 }
2113
2114 pub fn datagrams(&mut self) -> Datagrams<'_> {
2116 Datagrams { conn: self }
2117 }
2118
2119 pub fn stats(&mut self) -> ConnectionStats {
2121 self.stats.clone()
2122 }
2123
2124 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2126 let path = self.paths.get(&path_id)?;
2127 let stats = self.path_stats.entry(path_id).or_default();
2128 stats.rtt = path.data.rtt.get();
2129 stats.cwnd = path.data.congestion.window();
2130 stats.current_mtu = path.data.mtud.current_mtu();
2131 Some(*stats)
2132 }
2133
2134 pub fn ping(&mut self) {
2138 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2141 path_data.ping_pending = true;
2142 }
2143 }
2144
2145 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2149 let path_data = self.spaces[self.highest_space]
2150 .number_spaces
2151 .get_mut(&path)
2152 .ok_or(ClosedPath { _private: () })?;
2153 path_data.ping_pending = true;
2154 Ok(())
2155 }
2156
2157 pub fn force_key_update(&mut self) {
2161 if !self.state.is_established() {
2162 debug!("ignoring forced key update in illegal state");
2163 return;
2164 }
2165 if self.prev_crypto.is_some() {
2166 debug!("ignoring redundant forced key update");
2169 return;
2170 }
2171 self.update_keys(None, false);
2172 }
2173
2174 #[doc(hidden)]
2176 #[deprecated]
2177 pub fn initiate_key_update(&mut self) {
2178 self.force_key_update();
2179 }
2180
2181 pub fn crypto_session(&self) -> &dyn crypto::Session {
2183 &*self.crypto
2184 }
2185
2186 pub fn is_handshaking(&self) -> bool {
2191 self.state.is_handshake()
2192 }
2193
2194 pub fn is_closed(&self) -> bool {
2202 self.state.is_closed()
2203 }
2204
2205 pub fn is_drained(&self) -> bool {
2210 self.state.is_drained()
2211 }
2212
2213 pub fn accepted_0rtt(&self) -> bool {
2217 self.accepted_0rtt
2218 }
2219
2220 pub fn has_0rtt(&self) -> bool {
2222 self.zero_rtt_enabled
2223 }
2224
2225 pub fn has_pending_retransmits(&self) -> bool {
2227 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2228 }
2229
2230 pub fn side(&self) -> Side {
2232 self.side.side()
2233 }
2234
2235 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2237 self.path(path_id)
2238 .map(|path_data| {
2239 path_data
2240 .last_observed_addr_report
2241 .as_ref()
2242 .map(|observed| observed.socket_addr())
2243 })
2244 .ok_or(ClosedPath { _private: () })
2245 }
2246
2247 pub fn local_ip(&self) -> Option<IpAddr> {
2257 self.local_ip
2258 }
2259
2260 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2262 self.path(path_id).map(|d| d.rtt.get())
2263 }
2264
2265 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2267 self.path(path_id).map(|d| d.congestion.as_ref())
2268 }
2269
2270 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2275 self.streams.set_max_concurrent(dir, count);
2276 let pending = &mut self.spaces[SpaceId::Data].pending;
2279 self.streams.queue_max_stream_id(pending);
2280 }
2281
2282 pub fn set_max_concurrent_paths(
2292 &mut self,
2293 now: Instant,
2294 count: NonZeroU32,
2295 ) -> Result<(), MultipathNotNegotiated> {
2296 if !self.is_multipath_negotiated() {
2297 return Err(MultipathNotNegotiated { _private: () });
2298 }
2299 self.max_concurrent_paths = count;
2300
2301 let in_use_count = self
2302 .local_max_path_id
2303 .next()
2304 .saturating_sub(self.abandoned_paths.len() as u32)
2305 .as_u32();
2306 let extra_needed = count.get().saturating_sub(in_use_count);
2307 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2308
2309 self.set_max_path_id(now, new_max_path_id);
2310
2311 Ok(())
2312 }
2313
2314 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2316 if max_path_id <= self.local_max_path_id {
2317 return;
2318 }
2319
2320 self.local_max_path_id = max_path_id;
2321 self.spaces[SpaceId::Data].pending.max_path_id = true;
2322
2323 self.issue_first_path_cids(now);
2324 }
2325
2326 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2332 self.streams.max_concurrent(dir)
2333 }
2334
2335 pub fn set_send_window(&mut self, send_window: u64) {
2337 self.streams.set_send_window(send_window);
2338 }
2339
2340 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2342 if self.streams.set_receive_window(receive_window) {
2343 self.spaces[SpaceId::Data].pending.max_data = true;
2344 }
2345 }
2346
2347 pub fn is_multipath_negotiated(&self) -> bool {
2352 !self.is_handshaking()
2353 && self.config.max_concurrent_multipath_paths.is_some()
2354 && self.peer_params.initial_max_path_id.is_some()
2355 }
2356
2357 fn on_ack_received(
2358 &mut self,
2359 now: Instant,
2360 space: SpaceId,
2361 ack: frame::Ack,
2362 ) -> Result<(), TransportError> {
2363 let path = PathId::ZERO;
2365 self.inner_on_ack_received(now, space, path, ack)
2366 }
2367
2368 fn on_path_ack_received(
2369 &mut self,
2370 now: Instant,
2371 space: SpaceId,
2372 path_ack: frame::PathAck,
2373 ) -> Result<(), TransportError> {
2374 let (ack, path) = path_ack.into_ack();
2375 self.inner_on_ack_received(now, space, path, ack)
2376 }
2377
2378 fn inner_on_ack_received(
2380 &mut self,
2381 now: Instant,
2382 space: SpaceId,
2383 path: PathId,
2384 ack: frame::Ack,
2385 ) -> Result<(), TransportError> {
2386 if self.abandoned_paths.contains(&path) {
2387 trace!("silently ignoring PATH_ACK on abandoned path");
2390 return Ok(());
2391 }
2392 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2393 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2394 }
2395 let new_largest = {
2396 let space = &mut self.spaces[space].for_path(path);
2397 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2398 space.largest_acked_packet = Some(ack.largest);
2399 if let Some(info) = space.sent_packets.get(ack.largest) {
2400 space.largest_acked_packet_sent = info.time_sent;
2404 }
2405 true
2406 } else {
2407 false
2408 }
2409 };
2410
2411 if self.detect_spurious_loss(&ack, space, path) {
2412 self.path_data_mut(path)
2413 .congestion
2414 .on_spurious_congestion_event();
2415 }
2416
2417 let mut newly_acked = ArrayRangeSet::new();
2419 for range in ack.iter() {
2420 self.spaces[space].for_path(path).check_ack(range.clone())?;
2421 for (pn, _) in self.spaces[space]
2422 .for_path(path)
2423 .sent_packets
2424 .iter_range(range)
2425 {
2426 newly_acked.insert_one(pn);
2427 }
2428 }
2429
2430 if newly_acked.is_empty() {
2431 return Ok(());
2432 }
2433
2434 let mut ack_eliciting_acked = false;
2435 for packet in newly_acked.elts() {
2436 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2437 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2438 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2444 pns.pending_acks.subtract_below(*acked_pn);
2445 }
2446 }
2447 ack_eliciting_acked |= info.ack_eliciting;
2448
2449 let path_data = self.path_data_mut(path);
2451 let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2452 if mtu_updated {
2453 path_data
2454 .congestion
2455 .on_mtu_update(path_data.mtud.current_mtu());
2456 }
2457
2458 self.ack_frequency.on_acked(path, packet);
2460
2461 self.on_packet_acked(now, path, info);
2462 }
2463 }
2464
2465 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2466 let app_limited = self.app_limited;
2467 let path_data = self.path_data_mut(path);
2468 let in_flight = path_data.in_flight.bytes;
2469
2470 path_data
2471 .congestion
2472 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2473
2474 if new_largest && ack_eliciting_acked {
2475 let ack_delay = if space != SpaceId::Data {
2476 Duration::from_micros(0)
2477 } else {
2478 cmp::min(
2479 self.ack_frequency.peer_max_ack_delay,
2480 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2481 )
2482 };
2483 let rtt = now.saturating_duration_since(
2484 self.spaces[space].for_path(path).largest_acked_packet_sent,
2485 );
2486
2487 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2488 let path_data = self.path_data_mut(path);
2489 path_data.rtt.update(ack_delay, rtt);
2491 if path_data.first_packet_after_rtt_sample.is_none() {
2492 path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2493 }
2494 }
2495
2496 self.detect_lost_packets(now, space, path, true);
2498
2499 if self.peer_completed_address_validation(path) {
2500 self.path_data_mut(path).pto_count = 0;
2501 }
2502
2503 if self.path_data(path).sending_ecn {
2508 if let Some(ecn) = ack.ecn {
2509 if new_largest {
2514 let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2515 self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2516 }
2517 } else {
2518 debug!("ECN not acknowledged by peer");
2520 self.path_data_mut(path).sending_ecn = false;
2521 }
2522 }
2523
2524 self.set_loss_detection_timer(now, path);
2525 Ok(())
2526 }
2527
2528 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2529 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2530
2531 if lost_packets.is_empty() {
2532 return false;
2533 }
2534
2535 for range in ack.iter() {
2536 let spurious_losses: Vec<u64> = lost_packets
2537 .iter_range(range.clone())
2538 .map(|(pn, _info)| pn)
2539 .collect();
2540
2541 for pn in spurious_losses {
2542 lost_packets.remove(pn);
2543 }
2544 }
2545
2546 lost_packets.is_empty()
2551 }
2552
2553 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2558 let two_pto = 2 * self.path_data(path).rtt.pto_base();
2559
2560 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2561 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2562 }
2563
2564 fn process_ecn(
2566 &mut self,
2567 now: Instant,
2568 space: SpaceId,
2569 path: PathId,
2570 newly_acked: u64,
2571 ecn: frame::EcnCounts,
2572 largest_sent_time: Instant,
2573 ) {
2574 match self.spaces[space]
2575 .for_path(path)
2576 .detect_ecn(newly_acked, ecn)
2577 {
2578 Err(e) => {
2579 debug!("halting ECN due to verification failure: {}", e);
2580
2581 self.path_data_mut(path).sending_ecn = false;
2582 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2585 }
2586 Ok(false) => {}
2587 Ok(true) => {
2588 self.path_stats.entry(path).or_default().congestion_events += 1;
2589 self.path_data_mut(path).congestion.on_congestion_event(
2590 now,
2591 largest_sent_time,
2592 false,
2593 true,
2594 0,
2595 );
2596 }
2597 }
2598 }
2599
2600 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2603 self.paths
2604 .get_mut(&path_id)
2605 .expect("known path")
2606 .remove_in_flight(&info);
2607 let app_limited = self.app_limited;
2608 let path = self.path_data_mut(path_id);
2609 if info.ack_eliciting && !path.is_validating_path() {
2610 let rtt = path.rtt;
2613 path.congestion
2614 .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2615 }
2616
2617 if let Some(retransmits) = info.retransmits.get() {
2619 for (id, _) in retransmits.reset_stream.iter() {
2620 self.streams.reset_acked(*id);
2621 }
2622 }
2623
2624 for frame in info.stream_frames {
2625 self.streams.received_ack_of(frame);
2626 }
2627 }
2628
2629 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2630 let start = if self.zero_rtt_crypto.is_some() {
2631 now
2632 } else {
2633 self.prev_crypto
2634 .as_ref()
2635 .expect("no previous keys")
2636 .end_packet
2637 .as_ref()
2638 .expect("update not acknowledged yet")
2639 .1
2640 };
2641
2642 self.timers.set(
2644 Timer::Conn(ConnTimer::KeyDiscard),
2645 start + self.pto_max_path(space) * 3,
2646 self.qlog.with_time(now),
2647 );
2648 }
2649
2650 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2663 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2664 self.detect_lost_packets(now, pn_space, path_id, false);
2666 self.set_loss_detection_timer(now, path_id);
2667 return;
2668 }
2669
2670 let (_, space) = match self.pto_time_and_space(now, path_id) {
2671 Some(x) => x,
2672 None => {
2673 error!(%path_id, "PTO expired while unset");
2674 return;
2675 }
2676 };
2677 trace!(
2678 in_flight = self.path_data(path_id).in_flight.bytes,
2679 count = self.path_data(path_id).pto_count,
2680 ?space,
2681 %path_id,
2682 "PTO fired"
2683 );
2684
2685 let count = match self.path_data(path_id).in_flight.ack_eliciting {
2686 0 => {
2689 debug_assert!(!self.peer_completed_address_validation(path_id));
2690 1
2691 }
2692 _ => 2,
2694 };
2695 let pns = self.spaces[space].for_path(path_id);
2696 pns.loss_probes = pns.loss_probes.saturating_add(count);
2697 let path_data = self.path_data_mut(path_id);
2698 path_data.pto_count = path_data.pto_count.saturating_add(1);
2699 self.set_loss_detection_timer(now, path_id);
2700 }
2701
2702 fn detect_lost_packets(
2719 &mut self,
2720 now: Instant,
2721 pn_space: SpaceId,
2722 path_id: PathId,
2723 due_to_ack: bool,
2724 ) {
2725 let mut lost_packets = Vec::<u64>::new();
2726 let mut lost_mtu_probe = None;
2727 let mut in_persistent_congestion = false;
2728 let mut size_of_lost_packets = 0u64;
2729 self.spaces[pn_space].for_path(path_id).loss_time = None;
2730
2731 let path = self.path_data(path_id);
2734 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2735 let loss_delay = path
2736 .rtt
2737 .conservative()
2738 .mul_f32(self.config.time_threshold)
2739 .max(TIMER_GRANULARITY);
2740 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2741
2742 let largest_acked_packet = self.spaces[pn_space]
2743 .for_path(path_id)
2744 .largest_acked_packet
2745 .expect("detect_lost_packets only to be called if path received at least one ACK");
2746 let packet_threshold = self.config.packet_threshold as u64;
2747
2748 let congestion_period = self
2752 .pto(SpaceId::Data, path_id)
2753 .saturating_mul(self.config.persistent_congestion_threshold);
2754 let mut persistent_congestion_start: Option<Instant> = None;
2755 let mut prev_packet = None;
2756 let space = self.spaces[pn_space].for_path(path_id);
2757
2758 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2759 if prev_packet != Some(packet.wrapping_sub(1)) {
2760 persistent_congestion_start = None;
2762 }
2763
2764 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2768 if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2769 if Some(packet) == in_flight_mtu_probe {
2771 lost_mtu_probe = in_flight_mtu_probe;
2774 } else {
2775 lost_packets.push(packet);
2776 size_of_lost_packets += info.size as u64;
2777 if info.ack_eliciting && due_to_ack {
2778 match persistent_congestion_start {
2779 Some(start) if info.time_sent - start > congestion_period => {
2782 in_persistent_congestion = true;
2783 }
2784 None if first_packet_after_rtt_sample
2786 .is_some_and(|x| x < (pn_space, packet)) =>
2787 {
2788 persistent_congestion_start = Some(info.time_sent);
2789 }
2790 _ => {}
2791 }
2792 }
2793 }
2794 } else {
2795 if space.loss_time.is_none() {
2797 space.loss_time = Some(info.time_sent + loss_delay);
2800 }
2801 persistent_congestion_start = None;
2802 }
2803
2804 prev_packet = Some(packet);
2805 }
2806
2807 self.handle_lost_packets(
2808 pn_space,
2809 path_id,
2810 now,
2811 lost_packets,
2812 lost_mtu_probe,
2813 loss_delay,
2814 in_persistent_congestion,
2815 size_of_lost_packets,
2816 );
2817 }
2818
2819 fn discard_path(&mut self, path_id: PathId, now: Instant) {
2821 trace!(%path_id, "dropping path state");
2822 let path = self.path_data(path_id);
2823 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2824
2825 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2827 .for_path(path_id)
2828 .sent_packets
2829 .iter()
2830 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2831 .map(|(pn, info)| {
2832 size_of_lost_packets += info.size as u64;
2833 pn
2834 })
2835 .collect();
2836
2837 if !lost_pns.is_empty() {
2838 trace!(
2839 %path_id,
2840 count = lost_pns.len(),
2841 lost_bytes = size_of_lost_packets,
2842 "packets lost on path abandon"
2843 );
2844 self.handle_lost_packets(
2845 SpaceId::Data,
2846 path_id,
2847 now,
2848 lost_pns,
2849 in_flight_mtu_probe,
2850 Duration::ZERO,
2851 false,
2852 size_of_lost_packets,
2853 );
2854 }
2855 self.paths.remove(&path_id);
2856 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2857
2858 let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2859 self.events.push_back(
2860 PathEvent::Abandoned {
2861 id: path_id,
2862 path_stats,
2863 }
2864 .into(),
2865 );
2866 }
2867
2868 fn handle_lost_packets(
2869 &mut self,
2870 pn_space: SpaceId,
2871 path_id: PathId,
2872 now: Instant,
2873 lost_packets: Vec<u64>,
2874 lost_mtu_probe: Option<u64>,
2875 loss_delay: Duration,
2876 in_persistent_congestion: bool,
2877 size_of_lost_packets: u64,
2878 ) {
2879 debug_assert!(
2880 {
2881 let mut sorted = lost_packets.clone();
2882 sorted.sort();
2883 sorted == lost_packets
2884 },
2885 "lost_packets must be sorted"
2886 );
2887
2888 self.drain_lost_packets(now, pn_space, path_id);
2889
2890 if let Some(largest_lost) = lost_packets.last().cloned() {
2892 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2893 let largest_lost_sent = self.spaces[pn_space]
2894 .for_path(path_id)
2895 .sent_packets
2896 .get(largest_lost)
2897 .unwrap()
2898 .time_sent;
2899 let path_stats = self.path_stats.entry(path_id).or_default();
2900 path_stats.lost_packets += lost_packets.len() as u64;
2901 path_stats.lost_bytes += size_of_lost_packets;
2902 trace!(
2903 %path_id,
2904 count = lost_packets.len(),
2905 lost_bytes = size_of_lost_packets,
2906 "packets lost",
2907 );
2908
2909 for &packet in &lost_packets {
2910 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2911 continue;
2912 };
2913 self.qlog
2914 .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2915 self.paths
2916 .get_mut(&path_id)
2917 .unwrap()
2918 .remove_in_flight(&info);
2919
2920 for frame in info.stream_frames {
2921 self.streams.retransmit(frame);
2922 }
2923 self.spaces[pn_space].pending |= info.retransmits;
2924 self.path_data_mut(path_id)
2925 .mtud
2926 .on_non_probe_lost(packet, info.size);
2927
2928 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2929 packet,
2930 LostPacket {
2931 time_sent: info.time_sent,
2932 },
2933 );
2934 }
2935
2936 let path = self.path_data_mut(path_id);
2937 if path.mtud.black_hole_detected(now) {
2938 path.congestion.on_mtu_update(path.mtud.current_mtu());
2939 if let Some(max_datagram_size) = self.datagrams().max_size() {
2940 self.datagrams.drop_oversized(max_datagram_size);
2941 }
2942 self.path_stats
2943 .entry(path_id)
2944 .or_default()
2945 .black_holes_detected += 1;
2946 }
2947
2948 let lost_ack_eliciting =
2950 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2951
2952 if lost_ack_eliciting {
2953 self.path_stats
2954 .entry(path_id)
2955 .or_default()
2956 .congestion_events += 1;
2957 self.path_data_mut(path_id).congestion.on_congestion_event(
2958 now,
2959 largest_lost_sent,
2960 in_persistent_congestion,
2961 false,
2962 size_of_lost_packets,
2963 );
2964 }
2965 }
2966
2967 if let Some(packet) = lost_mtu_probe {
2969 let info = self.spaces[SpaceId::Data]
2970 .for_path(path_id)
2971 .take(packet)
2972 .unwrap(); self.paths
2975 .get_mut(&path_id)
2976 .unwrap()
2977 .remove_in_flight(&info);
2978 self.path_data_mut(path_id).mtud.on_probe_lost();
2979 self.path_stats
2980 .entry(path_id)
2981 .or_default()
2982 .lost_plpmtud_probes += 1;
2983 }
2984 }
2985
2986 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2992 SpaceId::iter()
2993 .filter_map(|id| {
2994 self.spaces[id]
2995 .number_spaces
2996 .get(&path_id)
2997 .and_then(|pns| pns.loss_time)
2998 .map(|time| (time, id))
2999 })
3000 .min_by_key(|&(time, _)| time)
3001 }
3002
3003 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
3005 let path = self.path(path_id)?;
3006 let pto_count = path.pto_count;
3007 let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
3008 let mut duration = path.rtt.pto_base() * backoff;
3009
3010 if path_id == PathId::ZERO
3011 && path.in_flight.ack_eliciting == 0
3012 && !self.peer_completed_address_validation(PathId::ZERO)
3013 {
3014 let space = match self.highest_space {
3020 SpaceId::Handshake => SpaceId::Handshake,
3021 _ => SpaceId::Initial,
3022 };
3023
3024 return Some((now + duration, space));
3025 }
3026
3027 let mut result = None;
3028 for space in SpaceId::iter() {
3029 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
3030 continue;
3031 };
3032
3033 if !pns.has_in_flight() {
3034 continue;
3035 }
3036 if space == SpaceId::Data {
3037 if self.is_handshaking() {
3039 return result;
3040 }
3041 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
3043 }
3044 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3045 continue;
3046 };
3047 let pto = last_ack_eliciting + duration;
3048 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3049 if path.anti_amplification_blocked(1) {
3050 continue;
3052 }
3053 if path.in_flight.ack_eliciting == 0 {
3054 continue;
3056 }
3057 result = Some((pto, space));
3058 }
3059 }
3060 result
3061 }
3062
3063 fn peer_completed_address_validation(&self, path: PathId) -> bool {
3064 if self.side.is_server() || self.state.is_closed() {
3066 return true;
3067 }
3068 self.spaces[SpaceId::Handshake]
3071 .path_space(PathId::ZERO)
3072 .and_then(|pns| pns.largest_acked_packet)
3073 .is_some()
3074 || self.spaces[SpaceId::Data]
3075 .path_space(path)
3076 .and_then(|pns| pns.largest_acked_packet)
3077 .is_some()
3078 || (self.spaces[SpaceId::Data].crypto.is_some()
3079 && self.spaces[SpaceId::Handshake].crypto.is_none())
3080 }
3081
3082 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3090 if self.state.is_closed() {
3091 return;
3095 }
3096
3097 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3098 self.timers.set(
3100 Timer::PerPath(path_id, PathTimer::LossDetection),
3101 loss_time,
3102 self.qlog.with_time(now),
3103 );
3104 return;
3105 }
3106
3107 if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3110 self.timers.set(
3111 Timer::PerPath(path_id, PathTimer::LossDetection),
3112 timeout,
3113 self.qlog.with_time(now),
3114 );
3115 } else {
3116 self.timers.stop(
3117 Timer::PerPath(path_id, PathTimer::LossDetection),
3118 self.qlog.with_time(now),
3119 );
3120 }
3121 }
3122
3123 fn pto_max_path(&self, space: SpaceId) -> Duration {
3127 match space {
3128 SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3129 SpaceId::Data => self
3130 .paths
3131 .keys()
3132 .map(|path_id| self.pto(space, *path_id))
3133 .max()
3134 .expect("there should be one at least path"),
3135 }
3136 }
3137
3138 fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3143 let max_ack_delay = match space {
3144 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3145 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3146 };
3147 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3148 }
3149
3150 fn on_packet_authenticated(
3151 &mut self,
3152 now: Instant,
3153 space_id: SpaceId,
3154 path_id: PathId,
3155 ecn: Option<EcnCodepoint>,
3156 packet: Option<u64>,
3157 spin: bool,
3158 is_1rtt: bool,
3159 ) {
3160 self.total_authed_packets += 1;
3161 if let Some(last_allowed_receive) = self
3162 .paths
3163 .get(&path_id)
3164 .and_then(|path| path.data.last_allowed_receive)
3165 {
3166 if now > last_allowed_receive {
3167 warn!("received data on path which we abandoned more than 3 * PTO ago");
3168 if !self.state.is_closed() {
3170 self.state.move_to_closed(TransportError::NO_ERROR(
3172 "peer failed to respond with PATH_ABANDON in time",
3173 ));
3174 self.close_common();
3175 self.set_close_timer(now);
3176 self.close = true;
3177 }
3178 return;
3179 }
3180 }
3181
3182 self.reset_keep_alive(path_id, now);
3183 self.reset_idle_timeout(now, space_id, path_id);
3184 self.permit_idle_reset = true;
3185 self.receiving_ecn |= ecn.is_some();
3186 if let Some(x) = ecn {
3187 let space = &mut self.spaces[space_id];
3188 space.for_path(path_id).ecn_counters += x;
3189
3190 if x.is_ce() {
3191 space
3192 .for_path(path_id)
3193 .pending_acks
3194 .set_immediate_ack_required();
3195 }
3196 }
3197
3198 let packet = match packet {
3199 Some(x) => x,
3200 None => return,
3201 };
3202 match &self.side {
3203 ConnectionSide::Client { .. } => {
3204 if space_id == SpaceId::Handshake {
3208 if let Some(hs) = self.state.as_handshake_mut() {
3209 hs.allow_server_migration = false;
3210 }
3211 }
3212 }
3213 ConnectionSide::Server { .. } => {
3214 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3215 {
3216 self.discard_space(now, SpaceId::Initial);
3218 }
3219 if self.zero_rtt_crypto.is_some() && is_1rtt {
3220 self.set_key_discard_timer(now, space_id)
3222 }
3223 }
3224 }
3225 let space = self.spaces[space_id].for_path(path_id);
3226 space.pending_acks.insert_one(packet, now);
3227 if packet >= space.rx_packet.unwrap_or_default() {
3228 space.rx_packet = Some(packet);
3229 self.spin = self.side.is_client() ^ spin;
3231 }
3232 }
3233
3234 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3239 if let Some(timeout) = self.idle_timeout {
3241 if self.state.is_closed() {
3242 self.timers
3243 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3244 } else {
3245 let dt = cmp::max(timeout, 3 * self.pto_max_path(space));
3246 self.timers.set(
3247 Timer::Conn(ConnTimer::Idle),
3248 now + dt,
3249 self.qlog.with_time(now),
3250 );
3251 }
3252 }
3253
3254 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3256 if self.state.is_closed() {
3257 self.timers.stop(
3258 Timer::PerPath(path_id, PathTimer::PathIdle),
3259 self.qlog.with_time(now),
3260 );
3261 } else {
3262 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3263 self.timers.set(
3264 Timer::PerPath(path_id, PathTimer::PathIdle),
3265 now + dt,
3266 self.qlog.with_time(now),
3267 );
3268 }
3269 }
3270 }
3271
3272 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3274 if !self.state.is_established() {
3275 return;
3276 }
3277
3278 if let Some(interval) = self.config.keep_alive_interval {
3279 self.timers.set(
3280 Timer::Conn(ConnTimer::KeepAlive),
3281 now + interval,
3282 self.qlog.with_time(now),
3283 );
3284 }
3285
3286 if let Some(interval) = self.path_data(path_id).keep_alive {
3287 self.timers.set(
3288 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3289 now + interval,
3290 self.qlog.with_time(now),
3291 );
3292 }
3293 }
3294
3295 fn reset_cid_retirement(&mut self, now: Instant) {
3297 if let Some((_path, t)) = self.next_cid_retirement() {
3298 self.timers.set(
3299 Timer::Conn(ConnTimer::PushNewCid),
3300 t,
3301 self.qlog.with_time(now),
3302 );
3303 }
3304 }
3305
3306 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3308 self.local_cid_state
3309 .iter()
3310 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3311 .min_by_key(|(_path_id, timeout)| *timeout)
3312 }
3313
3314 pub(crate) fn handle_first_packet(
3319 &mut self,
3320 now: Instant,
3321 remote: SocketAddr,
3322 ecn: Option<EcnCodepoint>,
3323 packet_number: u64,
3324 packet: InitialPacket,
3325 remaining: Option<BytesMut>,
3326 ) -> Result<(), ConnectionError> {
3327 let span = trace_span!("first recv");
3328 let _guard = span.enter();
3329 debug_assert!(self.side.is_server());
3330 let len = packet.header_data.len() + packet.payload.len();
3331 let path_id = PathId::ZERO;
3332 self.path_data_mut(path_id).total_recvd = len as u64;
3333
3334 if let Some(hs) = self.state.as_handshake_mut() {
3335 hs.expected_token = packet.header.token.clone();
3336 } else {
3337 unreachable!("first packet must be delivered in Handshake state");
3338 }
3339
3340 self.on_packet_authenticated(
3342 now,
3343 SpaceId::Initial,
3344 path_id,
3345 ecn,
3346 Some(packet_number),
3347 false,
3348 false,
3349 );
3350
3351 let packet: Packet = packet.into();
3352
3353 let mut qlog = QlogRecvPacket::new(len);
3354 qlog.header(&packet.header, Some(packet_number), path_id);
3355
3356 self.process_decrypted_packet(
3357 now,
3358 remote,
3359 path_id,
3360 Some(packet_number),
3361 packet,
3362 &mut qlog,
3363 )?;
3364 self.qlog.emit_packet_received(qlog, now);
3365 if let Some(data) = remaining {
3366 self.handle_coalesced(now, remote, path_id, ecn, data);
3367 }
3368
3369 self.qlog.emit_recovery_metrics(
3370 path_id,
3371 &mut self.paths.get_mut(&path_id).unwrap().data,
3372 now,
3373 );
3374
3375 Ok(())
3376 }
3377
3378 fn init_0rtt(&mut self, now: Instant) {
3379 let (header, packet) = match self.crypto.early_crypto() {
3380 Some(x) => x,
3381 None => return,
3382 };
3383 if self.side.is_client() {
3384 match self.crypto.transport_parameters() {
3385 Ok(params) => {
3386 let params = params
3387 .expect("crypto layer didn't supply transport parameters with ticket");
3388 let params = TransportParameters {
3390 initial_src_cid: None,
3391 original_dst_cid: None,
3392 preferred_address: None,
3393 retry_src_cid: None,
3394 stateless_reset_token: None,
3395 min_ack_delay: None,
3396 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3397 max_ack_delay: TransportParameters::default().max_ack_delay,
3398 initial_max_path_id: None,
3399 ..params
3400 };
3401 self.set_peer_params(params);
3402 self.qlog.emit_peer_transport_params_restored(self, now);
3403 }
3404 Err(e) => {
3405 error!("session ticket has malformed transport parameters: {}", e);
3406 return;
3407 }
3408 }
3409 }
3410 trace!("0-RTT enabled");
3411 self.zero_rtt_enabled = true;
3412 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3413 }
3414
3415 fn read_crypto(
3416 &mut self,
3417 space: SpaceId,
3418 crypto: &frame::Crypto,
3419 payload_len: usize,
3420 ) -> Result<(), TransportError> {
3421 let expected = if !self.state.is_handshake() {
3422 SpaceId::Data
3423 } else if self.highest_space == SpaceId::Initial {
3424 SpaceId::Initial
3425 } else {
3426 SpaceId::Handshake
3429 };
3430 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3434
3435 let end = crypto.offset + crypto.data.len() as u64;
3436 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3437 warn!(
3438 "received new {:?} CRYPTO data when expecting {:?}",
3439 space, expected
3440 );
3441 return Err(TransportError::PROTOCOL_VIOLATION(
3442 "new data at unexpected encryption level",
3443 ));
3444 }
3445
3446 let space = &mut self.spaces[space];
3447 let max = end.saturating_sub(space.crypto_stream.bytes_read());
3448 if max > self.config.crypto_buffer_size as u64 {
3449 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3450 }
3451
3452 space
3453 .crypto_stream
3454 .insert(crypto.offset, crypto.data.clone(), payload_len);
3455 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3456 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3457 if self.crypto.read_handshake(&chunk.bytes)? {
3458 self.events.push_back(Event::HandshakeDataReady);
3459 }
3460 }
3461
3462 Ok(())
3463 }
3464
3465 fn write_crypto(&mut self) {
3466 loop {
3467 let space = self.highest_space;
3468 let mut outgoing = Vec::new();
3469 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3470 match space {
3471 SpaceId::Initial => {
3472 self.upgrade_crypto(SpaceId::Handshake, crypto);
3473 }
3474 SpaceId::Handshake => {
3475 self.upgrade_crypto(SpaceId::Data, crypto);
3476 }
3477 _ => unreachable!("got updated secrets during 1-RTT"),
3478 }
3479 }
3480 if outgoing.is_empty() {
3481 if space == self.highest_space {
3482 break;
3483 } else {
3484 continue;
3486 }
3487 }
3488 let offset = self.spaces[space].crypto_offset;
3489 let outgoing = Bytes::from(outgoing);
3490 if let Some(hs) = self.state.as_handshake_mut() {
3491 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3492 hs.client_hello = Some(outgoing.clone());
3493 }
3494 }
3495 self.spaces[space].crypto_offset += outgoing.len() as u64;
3496 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3497 self.spaces[space].pending.crypto.push_back(frame::Crypto {
3498 offset,
3499 data: outgoing,
3500 });
3501 }
3502 }
3503
3504 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3506 debug_assert!(
3507 self.spaces[space].crypto.is_none(),
3508 "already reached packet space {space:?}"
3509 );
3510 trace!("{:?} keys ready", space);
3511 if space == SpaceId::Data {
3512 self.next_crypto = Some(
3514 self.crypto
3515 .next_1rtt_keys()
3516 .expect("handshake should be complete"),
3517 );
3518 }
3519
3520 self.spaces[space].crypto = Some(crypto);
3521 debug_assert!(space as usize > self.highest_space as usize);
3522 self.highest_space = space;
3523 if space == SpaceId::Data && self.side.is_client() {
3524 self.zero_rtt_crypto = None;
3526 }
3527 }
3528
3529 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3530 debug_assert!(space_id != SpaceId::Data);
3531 trace!("discarding {:?} keys", space_id);
3532 if space_id == SpaceId::Initial {
3533 if let ConnectionSide::Client { token, .. } = &mut self.side {
3535 *token = Bytes::new();
3536 }
3537 }
3538 let space = &mut self.spaces[space_id];
3539 space.crypto = None;
3540 let pns = space.for_path(PathId::ZERO);
3541 pns.time_of_last_ack_eliciting_packet = None;
3542 pns.loss_time = None;
3543 pns.loss_probes = 0;
3544 let sent_packets = mem::take(&mut pns.sent_packets);
3545 let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3546 for (_, packet) in sent_packets.into_iter() {
3547 path.data.remove_in_flight(&packet);
3548 }
3549
3550 self.set_loss_detection_timer(now, PathId::ZERO)
3551 }
3552
3553 fn handle_coalesced(
3554 &mut self,
3555 now: Instant,
3556 remote: SocketAddr,
3557 path_id: PathId,
3558 ecn: Option<EcnCodepoint>,
3559 data: BytesMut,
3560 ) {
3561 self.path_data_mut(path_id)
3562 .inc_total_recvd(data.len() as u64);
3563 let mut remaining = Some(data);
3564 let cid_len = self
3565 .local_cid_state
3566 .values()
3567 .map(|cid_state| cid_state.cid_len())
3568 .next()
3569 .expect("one cid_state must exist");
3570 while let Some(data) = remaining {
3571 match PartialDecode::new(
3572 data,
3573 &FixedLengthConnectionIdParser::new(cid_len),
3574 &[self.version],
3575 self.endpoint_config.grease_quic_bit,
3576 ) {
3577 Ok((partial_decode, rest)) => {
3578 remaining = rest;
3579 self.handle_decode(now, remote, path_id, ecn, partial_decode);
3580 }
3581 Err(e) => {
3582 trace!("malformed header: {}", e);
3583 return;
3584 }
3585 }
3586 }
3587 }
3588
3589 fn handle_decode(
3590 &mut self,
3591 now: Instant,
3592 remote: SocketAddr,
3593 path_id: PathId,
3594 ecn: Option<EcnCodepoint>,
3595 partial_decode: PartialDecode,
3596 ) {
3597 let qlog = QlogRecvPacket::new(partial_decode.len());
3598 if let Some(decoded) = packet_crypto::unprotect_header(
3599 partial_decode,
3600 &self.spaces,
3601 self.zero_rtt_crypto.as_ref(),
3602 self.peer_params.stateless_reset_token,
3603 ) {
3604 self.handle_packet(
3605 now,
3606 remote,
3607 path_id,
3608 ecn,
3609 decoded.packet,
3610 decoded.stateless_reset,
3611 qlog,
3612 );
3613 }
3614 }
3615
3616 fn handle_packet(
3617 &mut self,
3618 now: Instant,
3619 remote: SocketAddr,
3620 path_id: PathId,
3621 ecn: Option<EcnCodepoint>,
3622 packet: Option<Packet>,
3623 stateless_reset: bool,
3624 mut qlog: QlogRecvPacket,
3625 ) {
3626 self.stats.udp_rx.ios += 1;
3627 if let Some(ref packet) = packet {
3628 trace!(
3629 "got {:?} packet ({} bytes) from {} using id {}",
3630 packet.header.space(),
3631 packet.payload.len() + packet.header_data.len(),
3632 remote,
3633 packet.header.dst_cid(),
3634 );
3635 }
3636
3637 if self.is_handshaking() {
3638 if path_id != PathId::ZERO {
3639 debug!(%remote, %path_id, "discarding multipath packet during handshake");
3640 return;
3641 }
3642 if remote != self.path_data_mut(path_id).remote {
3643 if let Some(hs) = self.state.as_handshake() {
3644 if hs.allow_server_migration {
3645 trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote");
3646 self.path_data_mut(path_id).remote = remote;
3647 self.qlog.emit_tuple_assigned(path_id, remote, now);
3648 } else {
3649 debug!("discarding packet with unexpected remote during handshake");
3650 return;
3651 }
3652 } else {
3653 debug!("discarding packet with unexpected remote during handshake");
3654 return;
3655 }
3656 }
3657 }
3658
3659 let was_closed = self.state.is_closed();
3660 let was_drained = self.state.is_drained();
3661
3662 let decrypted = match packet {
3663 None => Err(None),
3664 Some(mut packet) => self
3665 .decrypt_packet(now, path_id, &mut packet)
3666 .map(move |number| (packet, number)),
3667 };
3668 let result = match decrypted {
3669 _ if stateless_reset => {
3670 debug!("got stateless reset");
3671 Err(ConnectionError::Reset)
3672 }
3673 Err(Some(e)) => {
3674 warn!("illegal packet: {}", e);
3675 Err(e.into())
3676 }
3677 Err(None) => {
3678 debug!("failed to authenticate packet");
3679 self.authentication_failures += 1;
3680 let integrity_limit = self.spaces[self.highest_space]
3681 .crypto
3682 .as_ref()
3683 .unwrap()
3684 .packet
3685 .local
3686 .integrity_limit();
3687 if self.authentication_failures > integrity_limit {
3688 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3689 } else {
3690 return;
3691 }
3692 }
3693 Ok((packet, number)) => {
3694 qlog.header(&packet.header, number, path_id);
3695 let span = match number {
3696 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3697 None => trace_span!("recv", space = ?packet.header.space()),
3698 };
3699 let _guard = span.enter();
3700
3701 let dedup = self.spaces[packet.header.space()]
3702 .path_space_mut(path_id)
3703 .map(|pns| &mut pns.dedup);
3704 if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3705 debug!("discarding possible duplicate packet");
3706 self.qlog.emit_packet_received(qlog, now);
3707 return;
3708 } else if self.state.is_handshake() && packet.header.is_short() {
3709 trace!("dropping short packet during handshake");
3711 self.qlog.emit_packet_received(qlog, now);
3712 return;
3713 } else {
3714 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3715 if let Some(hs) = self.state.as_handshake() {
3716 if self.side.is_server() && token != &hs.expected_token {
3717 warn!("discarding Initial with invalid retry token");
3721 self.qlog.emit_packet_received(qlog, now);
3722 return;
3723 }
3724 }
3725 }
3726
3727 if !self.state.is_closed() {
3728 let spin = match packet.header {
3729 Header::Short { spin, .. } => spin,
3730 _ => false,
3731 };
3732
3733 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3734 self.ensure_path(path_id, remote, now, number);
3736 }
3737 if self.paths.contains_key(&path_id) {
3738 self.on_packet_authenticated(
3739 now,
3740 packet.header.space(),
3741 path_id,
3742 ecn,
3743 number,
3744 spin,
3745 packet.header.is_1rtt(),
3746 );
3747 }
3748 }
3749
3750 let res = self
3751 .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog);
3752
3753 self.qlog.emit_packet_received(qlog, now);
3754 res
3755 }
3756 }
3757 };
3758
3759 if let Err(conn_err) = result {
3761 match conn_err {
3762 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3763 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3764 ConnectionError::Reset
3765 | ConnectionError::TransportError(TransportError {
3766 code: TransportErrorCode::AEAD_LIMIT_REACHED,
3767 ..
3768 }) => {
3769 self.state.move_to_drained(Some(conn_err));
3770 }
3771 ConnectionError::TimedOut => {
3772 unreachable!("timeouts aren't generated by packet processing");
3773 }
3774 ConnectionError::TransportError(err) => {
3775 debug!("closing connection due to transport error: {}", err);
3776 self.state.move_to_closed(err);
3777 }
3778 ConnectionError::VersionMismatch => {
3779 self.state.move_to_draining(Some(conn_err));
3780 }
3781 ConnectionError::LocallyClosed => {
3782 unreachable!("LocallyClosed isn't generated by packet processing");
3783 }
3784 ConnectionError::CidsExhausted => {
3785 unreachable!("CidsExhausted isn't generated by packet processing");
3786 }
3787 };
3788 }
3789
3790 if !was_closed && self.state.is_closed() {
3791 self.close_common();
3792 if !self.state.is_drained() {
3793 self.set_close_timer(now);
3794 }
3795 }
3796 if !was_drained && self.state.is_drained() {
3797 self.endpoint_events.push_back(EndpointEventInner::Drained);
3798 self.timers
3801 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3802 }
3803
3804 if matches!(self.state.as_type(), StateType::Closed) {
3806 let path_remote = self
3810 .paths
3811 .get(&path_id)
3812 .map(|p| p.data.remote)
3813 .unwrap_or(remote);
3814 self.close = remote == path_remote;
3815 }
3816 }
3817
3818 fn process_decrypted_packet(
3819 &mut self,
3820 now: Instant,
3821 remote: SocketAddr,
3822 path_id: PathId,
3823 number: Option<u64>,
3824 packet: Packet,
3825 qlog: &mut QlogRecvPacket,
3826 ) -> Result<(), ConnectionError> {
3827 if !self.paths.contains_key(&path_id) {
3828 trace!(%path_id, ?number, "discarding packet for unknown path");
3832 return Ok(());
3833 }
3834 let state = match self.state.as_type() {
3835 StateType::Established => {
3836 match packet.header.space() {
3837 SpaceId::Data => {
3838 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?
3839 }
3840 _ if packet.header.has_frames() => {
3841 self.process_early_payload(now, path_id, packet, qlog)?
3842 }
3843 _ => {
3844 trace!("discarding unexpected pre-handshake packet");
3845 }
3846 }
3847 return Ok(());
3848 }
3849 StateType::Closed => {
3850 for result in frame::Iter::new(packet.payload.freeze())? {
3851 let frame = match result {
3852 Ok(frame) => frame,
3853 Err(err) => {
3854 debug!("frame decoding error: {err:?}");
3855 continue;
3856 }
3857 };
3858 qlog.frame(&frame);
3859
3860 if let Frame::Padding = frame {
3861 continue;
3862 };
3863
3864 self.stats.frame_rx.record(&frame);
3865
3866 if let Frame::Close(_error) = frame {
3867 trace!("draining");
3868 self.state.move_to_draining(None);
3869 break;
3870 }
3871 }
3872 return Ok(());
3873 }
3874 StateType::Draining | StateType::Drained => return Ok(()),
3875 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3876 };
3877
3878 match packet.header {
3879 Header::Retry {
3880 src_cid: rem_cid, ..
3881 } => {
3882 debug_assert_eq!(path_id, PathId::ZERO);
3883 if self.side.is_server() {
3884 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3885 }
3886
3887 let is_valid_retry = self
3888 .rem_cids
3889 .get(&path_id)
3890 .map(|cids| cids.active())
3891 .map(|orig_dst_cid| {
3892 self.crypto.is_valid_retry(
3893 orig_dst_cid,
3894 &packet.header_data,
3895 &packet.payload,
3896 )
3897 })
3898 .unwrap_or_default();
3899 if self.total_authed_packets > 1
3900 || packet.payload.len() <= 16 || !is_valid_retry
3902 {
3903 trace!("discarding invalid Retry");
3904 return Ok(());
3912 }
3913
3914 trace!("retrying with CID {}", rem_cid);
3915 let client_hello = state.client_hello.take().unwrap();
3916 self.retry_src_cid = Some(rem_cid);
3917 self.rem_cids
3918 .get_mut(&path_id)
3919 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3920 .update_initial_cid(rem_cid);
3921 self.rem_handshake_cid = rem_cid;
3922
3923 let space = &mut self.spaces[SpaceId::Initial];
3924 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3925 self.on_packet_acked(now, PathId::ZERO, info);
3926 };
3927
3928 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = {
3931 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3932 space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3933 space.crypto_offset = client_hello.len() as u64;
3934 space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3935 .for_path(path_id)
3936 .next_packet_number;
3937 space.pending.crypto.push_back(frame::Crypto {
3938 offset: 0,
3939 data: client_hello,
3940 });
3941 space
3942 };
3943
3944 let zero_rtt = mem::take(
3946 &mut self.spaces[SpaceId::Data]
3947 .for_path(PathId::ZERO)
3948 .sent_packets,
3949 );
3950 for (_, info) in zero_rtt.into_iter() {
3951 self.paths
3952 .get_mut(&PathId::ZERO)
3953 .unwrap()
3954 .remove_in_flight(&info);
3955 self.spaces[SpaceId::Data].pending |= info.retransmits;
3956 }
3957 self.streams.retransmit_all_for_0rtt();
3958
3959 let token_len = packet.payload.len() - 16;
3960 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3961 unreachable!("we already short-circuited if we're server");
3962 };
3963 *token = packet.payload.freeze().split_to(token_len);
3964
3965 self.state = State::handshake(state::Handshake {
3966 expected_token: Bytes::new(),
3967 rem_cid_set: false,
3968 client_hello: None,
3969 allow_server_migration: true,
3970 });
3971 Ok(())
3972 }
3973 Header::Long {
3974 ty: LongType::Handshake,
3975 src_cid: rem_cid,
3976 dst_cid: loc_cid,
3977 ..
3978 } => {
3979 debug_assert_eq!(path_id, PathId::ZERO);
3980 if rem_cid != self.rem_handshake_cid {
3981 debug!(
3982 "discarding packet with mismatched remote CID: {} != {}",
3983 self.rem_handshake_cid, rem_cid
3984 );
3985 return Ok(());
3986 }
3987 self.on_path_validated(path_id);
3988
3989 self.process_early_payload(now, path_id, packet, qlog)?;
3990 if self.state.is_closed() {
3991 return Ok(());
3992 }
3993
3994 if self.crypto.is_handshaking() {
3995 trace!("handshake ongoing");
3996 return Ok(());
3997 }
3998
3999 if self.side.is_client() {
4000 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4002 TransportError::new(
4003 TransportErrorCode::crypto(0x6d),
4004 "transport parameters missing".to_owned(),
4005 )
4006 })?;
4007
4008 if self.has_0rtt() {
4009 if !self.crypto.early_data_accepted().unwrap() {
4010 debug_assert!(self.side.is_client());
4011 debug!("0-RTT rejected");
4012 self.accepted_0rtt = false;
4013 self.streams.zero_rtt_rejected();
4014
4015 self.spaces[SpaceId::Data].pending = Retransmits::default();
4017
4018 let sent_packets = mem::take(
4020 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
4021 );
4022 for (_, packet) in sent_packets.into_iter() {
4023 self.paths
4024 .get_mut(&path_id)
4025 .unwrap()
4026 .remove_in_flight(&packet);
4027 }
4028 } else {
4029 self.accepted_0rtt = true;
4030 params.validate_resumption_from(&self.peer_params)?;
4031 }
4032 }
4033 if let Some(token) = params.stateless_reset_token {
4034 let remote = self.path_data(path_id).remote;
4035 self.endpoint_events
4036 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4037 }
4038 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4039 self.issue_first_cids(now);
4040 } else {
4041 self.spaces[SpaceId::Data].pending.handshake_done = true;
4043 self.discard_space(now, SpaceId::Handshake);
4044 self.events.push_back(Event::HandshakeConfirmed);
4045 trace!("handshake confirmed");
4046 }
4047
4048 self.events.push_back(Event::Connected);
4049 self.state.move_to_established();
4050 trace!("established");
4051
4052 self.issue_first_path_cids(now);
4055 Ok(())
4056 }
4057 Header::Initial(InitialHeader {
4058 src_cid: rem_cid,
4059 dst_cid: loc_cid,
4060 ..
4061 }) => {
4062 debug_assert_eq!(path_id, PathId::ZERO);
4063 if !state.rem_cid_set {
4064 trace!("switching remote CID to {}", rem_cid);
4065 let mut state = state.clone();
4066 self.rem_cids
4067 .get_mut(&path_id)
4068 .expect("PathId::ZERO not yet abandoned")
4069 .update_initial_cid(rem_cid);
4070 self.rem_handshake_cid = rem_cid;
4071 self.orig_rem_cid = rem_cid;
4072 state.rem_cid_set = true;
4073 self.state.move_to_handshake(state);
4074 } else if rem_cid != self.rem_handshake_cid {
4075 debug!(
4076 "discarding packet with mismatched remote CID: {} != {}",
4077 self.rem_handshake_cid, rem_cid
4078 );
4079 return Ok(());
4080 }
4081
4082 let starting_space = self.highest_space;
4083 self.process_early_payload(now, path_id, packet, qlog)?;
4084
4085 if self.side.is_server()
4086 && starting_space == SpaceId::Initial
4087 && self.highest_space != SpaceId::Initial
4088 {
4089 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4090 TransportError::new(
4091 TransportErrorCode::crypto(0x6d),
4092 "transport parameters missing".to_owned(),
4093 )
4094 })?;
4095 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4096 self.issue_first_cids(now);
4097 self.init_0rtt(now);
4098 }
4099 Ok(())
4100 }
4101 Header::Long {
4102 ty: LongType::ZeroRtt,
4103 ..
4104 } => {
4105 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?;
4106 Ok(())
4107 }
4108 Header::VersionNegotiate { .. } => {
4109 if self.total_authed_packets > 1 {
4110 return Ok(());
4111 }
4112 let supported = packet
4113 .payload
4114 .chunks(4)
4115 .any(|x| match <[u8; 4]>::try_from(x) {
4116 Ok(version) => self.version == u32::from_be_bytes(version),
4117 Err(_) => false,
4118 });
4119 if supported {
4120 return Ok(());
4121 }
4122 debug!("remote doesn't support our version");
4123 Err(ConnectionError::VersionMismatch)
4124 }
4125 Header::Short { .. } => unreachable!(
4126 "short packets received during handshake are discarded in handle_packet"
4127 ),
4128 }
4129 }
4130
4131 fn process_early_payload(
4133 &mut self,
4134 now: Instant,
4135 path_id: PathId,
4136 packet: Packet,
4137 #[allow(unused)] qlog: &mut QlogRecvPacket,
4138 ) -> Result<(), TransportError> {
4139 debug_assert_ne!(packet.header.space(), SpaceId::Data);
4140 debug_assert_eq!(path_id, PathId::ZERO);
4141 let payload_len = packet.payload.len();
4142 let mut ack_eliciting = false;
4143 for result in frame::Iter::new(packet.payload.freeze())? {
4144 let frame = result?;
4145 qlog.frame(&frame);
4146 let span = match frame {
4147 Frame::Padding => continue,
4148 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4149 };
4150
4151 self.stats.frame_rx.record(&frame);
4152
4153 let _guard = span.as_ref().map(|x| x.enter());
4154 ack_eliciting |= frame.is_ack_eliciting();
4155
4156 if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4158 return Err(TransportError::PROTOCOL_VIOLATION(
4159 "illegal frame type in handshake",
4160 ));
4161 }
4162
4163 match frame {
4164 Frame::Padding | Frame::Ping => {}
4165 Frame::Crypto(frame) => {
4166 self.read_crypto(packet.header.space(), &frame, payload_len)?;
4167 }
4168 Frame::Ack(ack) => {
4169 self.on_ack_received(now, packet.header.space(), ack)?;
4170 }
4171 Frame::PathAck(ack) => {
4172 span.as_ref()
4173 .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4174 self.on_path_ack_received(now, packet.header.space(), ack)?;
4175 }
4176 Frame::Close(reason) => {
4177 self.state.move_to_draining(Some(reason.into()));
4178 return Ok(());
4179 }
4180 _ => {
4181 let mut err =
4182 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4183 err.frame = Some(frame.ty());
4184 return Err(err);
4185 }
4186 }
4187 }
4188
4189 if ack_eliciting {
4190 self.spaces[packet.header.space()]
4192 .for_path(path_id)
4193 .pending_acks
4194 .set_immediate_ack_required();
4195 }
4196
4197 self.write_crypto();
4198 Ok(())
4199 }
4200
4201 fn process_payload(
4203 &mut self,
4204 now: Instant,
4205 remote: SocketAddr,
4206 path_id: PathId,
4207 number: u64,
4208 packet: Packet,
4209 #[allow(unused)] qlog: &mut QlogRecvPacket,
4210 ) -> Result<(), TransportError> {
4211 let payload = packet.payload.freeze();
4212 let mut is_probing_packet = true;
4213 let mut close = None;
4214 let payload_len = payload.len();
4215 let mut ack_eliciting = false;
4216 let mut migration_observed_addr = None;
4219 for result in frame::Iter::new(payload)? {
4220 let frame = result?;
4221 qlog.frame(&frame);
4222 let span = match frame {
4223 Frame::Padding => continue,
4224 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4225 };
4226
4227 self.stats.frame_rx.record(&frame);
4228 match &frame {
4231 Frame::Crypto(f) => {
4232 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4233 }
4234 Frame::Stream(f) => {
4235 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4236 }
4237 Frame::Datagram(f) => {
4238 trace!(len = f.data.len(), "got datagram frame");
4239 }
4240 f => {
4241 trace!("got frame {:?}", f);
4242 }
4243 }
4244
4245 let _guard = span.enter();
4246 if packet.header.is_0rtt() {
4247 match frame {
4248 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4249 return Err(TransportError::PROTOCOL_VIOLATION(
4250 "illegal frame type in 0-RTT",
4251 ));
4252 }
4253 _ => {
4254 if frame.is_1rtt() {
4255 return Err(TransportError::PROTOCOL_VIOLATION(
4256 "illegal frame type in 0-RTT",
4257 ));
4258 }
4259 }
4260 }
4261 }
4262 ack_eliciting |= frame.is_ack_eliciting();
4263
4264 match frame {
4266 Frame::Padding
4267 | Frame::PathChallenge(_)
4268 | Frame::PathResponse(_)
4269 | Frame::NewConnectionId(_)
4270 | Frame::ObservedAddr(_) => {}
4271 _ => {
4272 is_probing_packet = false;
4273 }
4274 }
4275
4276 match frame {
4277 Frame::Crypto(frame) => {
4278 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4279 }
4280 Frame::Stream(frame) => {
4281 if self.streams.received(frame, payload_len)?.should_transmit() {
4282 self.spaces[SpaceId::Data].pending.max_data = true;
4283 }
4284 }
4285 Frame::Ack(ack) => {
4286 self.on_ack_received(now, SpaceId::Data, ack)?;
4287 }
4288 Frame::PathAck(ack) => {
4289 span.record("path", tracing::field::debug(&ack.path_id));
4290 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4291 }
4292 Frame::Padding | Frame::Ping => {}
4293 Frame::Close(reason) => {
4294 close = Some(reason);
4295 }
4296 Frame::PathChallenge(challenge) => {
4297 let path = &mut self
4298 .path_mut(path_id)
4299 .expect("payload is processed only after the path becomes known");
4300 path.path_responses.push(number, challenge.0, remote);
4301 if remote == path.remote {
4302 match self.peer_supports_ack_frequency() {
4312 true => self.immediate_ack(path_id),
4313 false => {
4314 self.ping_path(path_id).ok();
4315 }
4316 }
4317 } else if self.iroh_hp.client_side().is_ok() {
4318 debug!("Potential Nat traversal PATH_CHALLENGE received");
4320 }
4321 }
4322 Frame::PathResponse(response) => {
4323 let path = self
4324 .paths
4325 .get_mut(&path_id)
4326 .expect("payload is processed only after the path becomes known");
4327
4328 match path.data.challenges_sent.get(&response.0) {
4329 Some(info) if info.remote == remote && path.data.remote == remote => {
4331 let sent_instant = info.sent_instant;
4332 self.timers.stop(
4334 Timer::PerPath(path_id, PathTimer::PathValidation),
4335 self.qlog.with_time(now),
4336 );
4337 self.timers.stop(
4338 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
4339 self.qlog.with_time(now),
4340 );
4341 if !path.data.validated {
4342 trace!("new path validated");
4343 }
4344 self.timers.stop(
4345 Timer::PerPath(path_id, PathTimer::PathOpen),
4346 self.qlog.with_time(now),
4347 );
4348 path.data
4350 .challenges_sent
4351 .retain(|_token, info| info.remote != remote);
4352 path.data.send_new_challenge = false;
4353 path.data.validated = true;
4354
4355 let rtt = now.saturating_duration_since(sent_instant);
4358 path.data.rtt.reset_initial_rtt(rtt);
4359
4360 self.events
4361 .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4362 if !std::mem::replace(&mut path.data.open, true) {
4365 trace!("path opened");
4366 if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4367 {
4368 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4369 id: path_id,
4370 addr: observed.socket_addr(),
4371 }));
4372 }
4373 }
4374 if let Some((_, ref mut prev)) = path.prev {
4375 prev.challenges_sent.clear();
4376 prev.send_new_challenge = false;
4377 }
4378 }
4379 Some(info) if info.remote == remote => {
4381 debug!("Response to off-path PathChallenge!");
4382 path.data
4383 .challenges_sent
4384 .retain(|_token, info| info.remote != remote);
4385 }
4386 Some(info) => {
4388 debug!(%response, from=%remote, expected=%info.remote, "ignoring invalid PATH_RESPONSE")
4389 }
4390 None => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4392 }
4393 }
4394 Frame::MaxData(bytes) => {
4395 self.streams.received_max_data(bytes);
4396 }
4397 Frame::MaxStreamData { id, offset } => {
4398 self.streams.received_max_stream_data(id, offset)?;
4399 }
4400 Frame::MaxStreams { dir, count } => {
4401 self.streams.received_max_streams(dir, count)?;
4402 }
4403 Frame::ResetStream(frame) => {
4404 if self.streams.received_reset(frame)?.should_transmit() {
4405 self.spaces[SpaceId::Data].pending.max_data = true;
4406 }
4407 }
4408 Frame::DataBlocked { offset } => {
4409 debug!(offset, "peer claims to be blocked at connection level");
4410 }
4411 Frame::StreamDataBlocked { id, offset } => {
4412 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4413 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4414 return Err(TransportError::STREAM_STATE_ERROR(
4415 "STREAM_DATA_BLOCKED on send-only stream",
4416 ));
4417 }
4418 debug!(
4419 stream = %id,
4420 offset, "peer claims to be blocked at stream level"
4421 );
4422 }
4423 Frame::StreamsBlocked { dir, limit } => {
4424 if limit > MAX_STREAM_COUNT {
4425 return Err(TransportError::FRAME_ENCODING_ERROR(
4426 "unrepresentable stream limit",
4427 ));
4428 }
4429 debug!(
4430 "peer claims to be blocked opening more than {} {} streams",
4431 limit, dir
4432 );
4433 }
4434 Frame::StopSending(frame::StopSending { id, error_code }) => {
4435 if id.initiator() != self.side.side() {
4436 if id.dir() == Dir::Uni {
4437 debug!("got STOP_SENDING on recv-only {}", id);
4438 return Err(TransportError::STREAM_STATE_ERROR(
4439 "STOP_SENDING on recv-only stream",
4440 ));
4441 }
4442 } else if self.streams.is_local_unopened(id) {
4443 return Err(TransportError::STREAM_STATE_ERROR(
4444 "STOP_SENDING on unopened stream",
4445 ));
4446 }
4447 self.streams.received_stop_sending(id, error_code);
4448 }
4449 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4450 if let Some(ref path_id) = path_id {
4451 span.record("path", tracing::field::debug(&path_id));
4452 }
4453 let path_id = path_id.unwrap_or_default();
4454 match self.local_cid_state.get_mut(&path_id) {
4455 None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4456 Some(cid_state) => {
4457 let allow_more_cids = cid_state
4458 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4459
4460 let has_path = !self.abandoned_paths.contains(&path_id);
4464 let allow_more_cids = allow_more_cids && has_path;
4465
4466 self.endpoint_events
4467 .push_back(EndpointEventInner::RetireConnectionId(
4468 now,
4469 path_id,
4470 sequence,
4471 allow_more_cids,
4472 ));
4473 }
4474 }
4475 }
4476 Frame::NewConnectionId(frame) => {
4477 let path_id = if let Some(path_id) = frame.path_id {
4478 if !self.is_multipath_negotiated() {
4479 return Err(TransportError::PROTOCOL_VIOLATION(
4480 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4481 ));
4482 }
4483 if path_id > self.local_max_path_id {
4484 return Err(TransportError::PROTOCOL_VIOLATION(
4485 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4486 ));
4487 }
4488 path_id
4489 } else {
4490 PathId::ZERO
4491 };
4492
4493 if self.abandoned_paths.contains(&path_id) {
4494 trace!("ignoring issued CID for abandoned path");
4495 continue;
4496 }
4497 if let Some(ref path_id) = frame.path_id {
4498 span.record("path", tracing::field::debug(&path_id));
4499 }
4500 let rem_cids = self
4501 .rem_cids
4502 .entry(path_id)
4503 .or_insert_with(|| CidQueue::new(frame.id));
4504 if rem_cids.active().is_empty() {
4505 return Err(TransportError::PROTOCOL_VIOLATION(
4507 "NEW_CONNECTION_ID when CIDs aren't in use",
4508 ));
4509 }
4510 if frame.retire_prior_to > frame.sequence {
4511 return Err(TransportError::PROTOCOL_VIOLATION(
4512 "NEW_CONNECTION_ID retiring unissued CIDs",
4513 ));
4514 }
4515
4516 use crate::cid_queue::InsertError;
4517 match rem_cids.insert(frame) {
4518 Ok(None) => {}
4519 Ok(Some((retired, reset_token))) => {
4520 let pending_retired =
4521 &mut self.spaces[SpaceId::Data].pending.retire_cids;
4522 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4525 if (pending_retired.len() as u64)
4528 .saturating_add(retired.end.saturating_sub(retired.start))
4529 > MAX_PENDING_RETIRED_CIDS
4530 {
4531 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4532 "queued too many retired CIDs",
4533 ));
4534 }
4535 pending_retired.extend(retired.map(|seq| (path_id, seq)));
4536 self.set_reset_token(path_id, remote, reset_token);
4537 }
4538 Err(InsertError::ExceedsLimit) => {
4539 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4540 }
4541 Err(InsertError::Retired) => {
4542 trace!("discarding already-retired");
4543 self.spaces[SpaceId::Data]
4547 .pending
4548 .retire_cids
4549 .push((path_id, frame.sequence));
4550 continue;
4551 }
4552 };
4553
4554 if self.side.is_server()
4555 && path_id == PathId::ZERO
4556 && self
4557 .rem_cids
4558 .get(&PathId::ZERO)
4559 .map(|cids| cids.active_seq() == 0)
4560 .unwrap_or_default()
4561 {
4562 self.update_rem_cid(PathId::ZERO);
4565 }
4566 }
4567 Frame::NewToken(NewToken { token }) => {
4568 let ConnectionSide::Client {
4569 token_store,
4570 server_name,
4571 ..
4572 } = &self.side
4573 else {
4574 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4575 };
4576 if token.is_empty() {
4577 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4578 }
4579 trace!("got new token");
4580 token_store.insert(server_name, token);
4581 }
4582 Frame::Datagram(datagram) => {
4583 if self
4584 .datagrams
4585 .received(datagram, &self.config.datagram_receive_buffer_size)?
4586 {
4587 self.events.push_back(Event::DatagramReceived);
4588 }
4589 }
4590 Frame::AckFrequency(ack_frequency) => {
4591 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4594 continue;
4597 }
4598
4599 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4601 space.pending_acks.set_ack_frequency_params(&ack_frequency);
4602
4603 if let Some(timeout) = space
4606 .pending_acks
4607 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4608 {
4609 self.timers.set(
4610 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4611 timeout,
4612 self.qlog.with_time(now),
4613 );
4614 }
4615 }
4616 }
4617 Frame::ImmediateAck => {
4618 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4620 pns.pending_acks.set_immediate_ack_required();
4621 }
4622 }
4623 Frame::HandshakeDone => {
4624 if self.side.is_server() {
4625 return Err(TransportError::PROTOCOL_VIOLATION(
4626 "client sent HANDSHAKE_DONE",
4627 ));
4628 }
4629 if self.spaces[SpaceId::Handshake].crypto.is_some() {
4630 self.discard_space(now, SpaceId::Handshake);
4631 }
4632 self.events.push_back(Event::HandshakeConfirmed);
4633 trace!("handshake confirmed");
4634 }
4635 Frame::ObservedAddr(observed) => {
4636 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4638 if !self
4639 .peer_params
4640 .address_discovery_role
4641 .should_report(&self.config.address_discovery_role)
4642 {
4643 return Err(TransportError::PROTOCOL_VIOLATION(
4644 "received OBSERVED_ADDRESS frame when not negotiated",
4645 ));
4646 }
4647 if packet.header.space() != SpaceId::Data {
4649 return Err(TransportError::PROTOCOL_VIOLATION(
4650 "OBSERVED_ADDRESS frame outside data space",
4651 ));
4652 }
4653
4654 let path = self.path_data_mut(path_id);
4655 if remote == path.remote {
4656 if let Some(updated) = path.update_observed_addr_report(observed) {
4657 if path.open {
4658 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4659 id: path_id,
4660 addr: updated,
4661 }));
4662 }
4663 }
4665 } else {
4666 migration_observed_addr = Some(observed)
4668 }
4669 }
4670 Frame::PathAbandon(frame::PathAbandon {
4671 path_id,
4672 error_code,
4673 }) => {
4674 span.record("path", tracing::field::debug(&path_id));
4675 let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4677 Ok(()) => {
4678 trace!("peer abandoned path");
4679 false
4680 }
4681 Err(ClosePathError::LastOpenPath) => {
4682 trace!("peer abandoned last path, closing connection");
4683 return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4685 }
4686 Err(ClosePathError::ClosedPath) => {
4687 trace!("peer abandoned already closed path");
4688 true
4689 }
4690 };
4691 if self.path(path_id).is_some() && !already_abandoned {
4696 let delay = self.pto(SpaceId::Data, path_id) * 3;
4701 self.timers.set(
4702 Timer::PerPath(path_id, PathTimer::DiscardPath),
4703 now + delay,
4704 self.qlog.with_time(now),
4705 );
4706 }
4707 }
4708 Frame::PathStatusAvailable(info) => {
4709 span.record("path", tracing::field::debug(&info.path_id));
4710 if self.is_multipath_negotiated() {
4711 self.on_path_status(
4712 info.path_id,
4713 PathStatus::Available,
4714 info.status_seq_no,
4715 );
4716 } else {
4717 return Err(TransportError::PROTOCOL_VIOLATION(
4718 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4719 ));
4720 }
4721 }
4722 Frame::PathStatusBackup(info) => {
4723 span.record("path", tracing::field::debug(&info.path_id));
4724 if self.is_multipath_negotiated() {
4725 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4726 } else {
4727 return Err(TransportError::PROTOCOL_VIOLATION(
4728 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4729 ));
4730 }
4731 }
4732 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4733 span.record("path", tracing::field::debug(&path_id));
4734 if !self.is_multipath_negotiated() {
4735 return Err(TransportError::PROTOCOL_VIOLATION(
4736 "received MAX_PATH_ID frame when multipath was not negotiated",
4737 ));
4738 }
4739 if path_id > self.remote_max_path_id {
4741 self.remote_max_path_id = path_id;
4742 self.issue_first_path_cids(now);
4743 }
4744 }
4745 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4746 if self.is_multipath_negotiated() {
4750 if self.local_max_path_id > max_path_id {
4751 return Err(TransportError::PROTOCOL_VIOLATION(
4752 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4753 ));
4754 }
4755 debug!("received PATHS_BLOCKED({:?})", max_path_id);
4756 } else {
4758 return Err(TransportError::PROTOCOL_VIOLATION(
4759 "received PATHS_BLOCKED frame when not multipath was not negotiated",
4760 ));
4761 }
4762 }
4763 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4764 if self.is_multipath_negotiated() {
4772 if path_id > self.local_max_path_id {
4773 return Err(TransportError::PROTOCOL_VIOLATION(
4774 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4775 ));
4776 }
4777 if next_seq.0
4778 > self
4779 .local_cid_state
4780 .get(&path_id)
4781 .map(|cid_state| cid_state.active_seq().1 + 1)
4782 .unwrap_or_default()
4783 {
4784 return Err(TransportError::PROTOCOL_VIOLATION(
4785 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4786 ));
4787 }
4788 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4789 } else {
4790 return Err(TransportError::PROTOCOL_VIOLATION(
4791 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4792 ));
4793 }
4794 }
4795 Frame::AddAddress(addr) => {
4796 let client_state = match self.iroh_hp.client_side_mut() {
4797 Ok(state) => state,
4798 Err(err) => {
4799 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4800 "Nat traversal(ADD_ADDRESS): {err}"
4801 )));
4802 }
4803 };
4804
4805 if !client_state.check_remote_address(&addr) {
4806 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4808 }
4809
4810 match client_state.add_remote_address(addr) {
4811 Ok(maybe_added) => {
4812 if let Some(added) = maybe_added {
4813 self.events.push_back(Event::NatTraversal(
4814 iroh_hp::Event::AddressAdded(added),
4815 ));
4816 }
4817 }
4818 Err(e) => {
4819 warn!(%e, "failed to add remote address")
4820 }
4821 }
4822 }
4823 Frame::RemoveAddress(addr) => {
4824 let client_state = match self.iroh_hp.client_side_mut() {
4825 Ok(state) => state,
4826 Err(err) => {
4827 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4828 "Nat traversal(REMOVE_ADDRESS): {err}"
4829 )));
4830 }
4831 };
4832 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4833 self.events
4834 .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4835 removed_addr,
4836 )));
4837 }
4838 }
4839 Frame::ReachOut(reach_out) => {
4840 let server_state = match self.iroh_hp.server_side_mut() {
4841 Ok(state) => state,
4842 Err(err) => {
4843 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4844 "Nat traversal(REACH_OUT): {err}"
4845 )));
4846 }
4847 };
4848
4849 if let Err(err) = server_state.handle_reach_out(reach_out) {
4850 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4851 "Nat traversal(REACH_OUT): {err}"
4852 )));
4853 }
4854 }
4855 }
4856 }
4857
4858 let space = self.spaces[SpaceId::Data].for_path(path_id);
4859 if space
4860 .pending_acks
4861 .packet_received(now, number, ack_eliciting, &space.dedup)
4862 {
4863 if self.abandoned_paths.contains(&path_id) {
4864 space.pending_acks.set_immediate_ack_required();
4867 } else {
4868 self.timers.set(
4869 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4870 now + self.ack_frequency.max_ack_delay,
4871 self.qlog.with_time(now),
4872 );
4873 }
4874 }
4875
4876 let pending = &mut self.spaces[SpaceId::Data].pending;
4881 self.streams.queue_max_stream_id(pending);
4882
4883 if let Some(reason) = close {
4884 self.state.move_to_draining(Some(reason.into()));
4885 self.close = true;
4886 }
4887
4888 if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4889 && !is_probing_packet
4890 && remote != self.path_data(path_id).remote
4891 {
4892 let ConnectionSide::Server { ref server_config } = self.side else {
4893 panic!("packets from unknown remote should be dropped by clients");
4894 };
4895 debug_assert!(
4896 server_config.migration,
4897 "migration-initiating packets should have been dropped immediately"
4898 );
4899 self.migrate(path_id, now, remote, migration_observed_addr);
4900 self.update_rem_cid(path_id);
4902 self.spin = false;
4903 }
4904
4905 Ok(())
4906 }
4907
4908 fn migrate(
4909 &mut self,
4910 path_id: PathId,
4911 now: Instant,
4912 remote: SocketAddr,
4913 observed_addr: Option<ObservedAddr>,
4914 ) {
4915 trace!(%remote, %path_id, "migration initiated");
4916 self.path_counter = self.path_counter.wrapping_add(1);
4917 let prev_pto = self.pto(SpaceId::Data, path_id);
4924 let known_path = self.paths.get_mut(&path_id).expect("known path");
4925 let path = &mut known_path.data;
4926 let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() {
4927 PathData::from_previous(remote, path, self.path_counter, now)
4928 } else {
4929 let peer_max_udp_payload_size =
4930 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4931 .unwrap_or(u16::MAX);
4932 PathData::new(
4933 remote,
4934 self.allow_mtud,
4935 Some(peer_max_udp_payload_size),
4936 self.path_counter,
4937 now,
4938 &self.config,
4939 )
4940 };
4941 new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4942 if let Some(report) = observed_addr {
4943 if let Some(updated) = new_path.update_observed_addr_report(report) {
4944 tracing::info!("adding observed addr event from migration");
4945 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4946 id: path_id,
4947 addr: updated,
4948 }));
4949 }
4950 }
4951 new_path.send_new_challenge = true;
4952
4953 let mut prev = mem::replace(path, new_path);
4954 if !prev.is_validating_path() {
4956 prev.send_new_challenge = true;
4957 known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4961 }
4962
4963 self.qlog.emit_tuple_assigned(path_id, remote, now);
4965
4966 self.timers.set(
4967 Timer::PerPath(path_id, PathTimer::PathValidation),
4968 now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4969 self.qlog.with_time(now),
4970 );
4971 }
4972
4973 pub fn local_address_changed(&mut self) {
4975 self.update_rem_cid(PathId::ZERO);
4977 self.ping();
4978 }
4979
4980 fn update_rem_cid(&mut self, path_id: PathId) {
4982 let Some((reset_token, retired)) =
4983 self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4984 else {
4985 return;
4986 };
4987
4988 self.spaces[SpaceId::Data]
4990 .pending
4991 .retire_cids
4992 .extend(retired.map(|seq| (path_id, seq)));
4993 let remote = self.path_data(path_id).remote;
4994 self.set_reset_token(path_id, remote, reset_token);
4995 }
4996
4997 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
5006 self.endpoint_events
5007 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
5008
5009 if path_id == PathId::ZERO {
5015 self.peer_params.stateless_reset_token = Some(reset_token);
5016 }
5017 }
5018
5019 fn issue_first_cids(&mut self, now: Instant) {
5021 if self
5022 .local_cid_state
5023 .get(&PathId::ZERO)
5024 .expect("PathId::ZERO exists when the connection is created")
5025 .cid_len()
5026 == 0
5027 {
5028 return;
5029 }
5030
5031 let mut n = self.peer_params.issue_cids_limit() - 1;
5033 if let ConnectionSide::Server { server_config } = &self.side {
5034 if server_config.has_preferred_address() {
5035 n -= 1;
5037 }
5038 }
5039 self.endpoint_events
5040 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5041 }
5042
5043 fn issue_first_path_cids(&mut self, now: Instant) {
5047 if let Some(max_path_id) = self.max_path_id() {
5048 let mut path_id = self.max_path_id_with_cids.next();
5049 while path_id <= max_path_id {
5050 self.endpoint_events
5051 .push_back(EndpointEventInner::NeedIdentifiers(
5052 path_id,
5053 now,
5054 self.peer_params.issue_cids_limit(),
5055 ));
5056 path_id = path_id.next();
5057 }
5058 self.max_path_id_with_cids = max_path_id;
5059 }
5060 }
5061
5062 fn populate_packet(
5070 &mut self,
5071 now: Instant,
5072 space_id: SpaceId,
5073 path_id: PathId,
5074 path_exclusive_only: bool,
5075 buf: &mut impl BufMut,
5076 pn: u64,
5077 #[allow(unused)] qlog: &mut QlogSentPacket,
5078 ) -> SentFrames {
5079 let mut sent = SentFrames::default();
5080 let is_multipath_negotiated = self.is_multipath_negotiated();
5081 let space = &mut self.spaces[space_id];
5082 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5083 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5084 space
5085 .for_path(path_id)
5086 .pending_acks
5087 .maybe_ack_non_eliciting();
5088
5089 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5091 trace!("HANDSHAKE_DONE");
5092 buf.write(frame::FrameType::HANDSHAKE_DONE);
5093 qlog.frame(&Frame::HandshakeDone);
5094 sent.retransmits.get_or_create().handshake_done = true;
5095 self.stats.frame_tx.handshake_done =
5097 self.stats.frame_tx.handshake_done.saturating_add(1);
5098 }
5099
5100 if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5103 while let Some(local_addr) = addresses.pop() {
5104 let reach_out = frame::ReachOut::new(*round, local_addr);
5105 if buf.remaining_mut() > reach_out.size() {
5106 trace!(%round, ?local_addr, "REACH_OUT");
5107 reach_out.write(buf);
5108 let sent_reachouts = sent
5109 .retransmits
5110 .get_or_create()
5111 .reach_out
5112 .get_or_insert_with(|| (*round, Default::default()));
5113 sent_reachouts.1.push(local_addr);
5114 self.stats.frame_tx.reach_out = self.stats.frame_tx.reach_out.saturating_add(1);
5115 qlog.frame(&Frame::ReachOut(reach_out));
5116 } else {
5117 addresses.push(local_addr);
5118 break;
5119 }
5120 }
5121 if addresses.is_empty() {
5122 space.pending.reach_out = None;
5123 }
5124 }
5125
5126 if !path_exclusive_only
5128 && space_id == SpaceId::Data
5129 && self
5130 .config
5131 .address_discovery_role
5132 .should_report(&self.peer_params.address_discovery_role)
5133 && (!path.observed_addr_sent || space.pending.observed_addr)
5134 {
5135 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5136 if buf.remaining_mut() > frame.size() {
5137 trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5138 frame.write(buf);
5139
5140 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5141 path.observed_addr_sent = true;
5142
5143 self.stats.frame_tx.observed_addr += 1;
5144 sent.retransmits.get_or_create().observed_addr = true;
5145 space.pending.observed_addr = false;
5146 qlog.frame(&Frame::ObservedAddr(frame));
5147 }
5148 }
5149
5150 if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5152 trace!("PING");
5153 buf.write(frame::FrameType::PING);
5154 sent.non_retransmits = true;
5155 self.stats.frame_tx.ping += 1;
5156 qlog.frame(&Frame::Ping);
5157 }
5158
5159 if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5161 debug_assert_eq!(
5162 space_id,
5163 SpaceId::Data,
5164 "immediate acks must be sent in the data space"
5165 );
5166 trace!("IMMEDIATE_ACK");
5167 buf.write(frame::FrameType::IMMEDIATE_ACK);
5168 sent.non_retransmits = true;
5169 self.stats.frame_tx.immediate_ack += 1;
5170 qlog.frame(&Frame::ImmediateAck);
5171 }
5172
5173 if !path_exclusive_only {
5177 for path_id in space
5178 .number_spaces
5179 .iter_mut()
5180 .filter(|(_, pns)| pns.pending_acks.can_send())
5181 .map(|(&path_id, _)| path_id)
5182 .collect::<Vec<_>>()
5183 {
5184 Self::populate_acks(
5185 now,
5186 self.receiving_ecn,
5187 &mut sent,
5188 path_id,
5189 space_id,
5190 space,
5191 is_multipath_negotiated,
5192 buf,
5193 &mut self.stats,
5194 qlog,
5195 );
5196 }
5197 }
5198
5199 if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5201 let sequence_number = self.ack_frequency.next_sequence_number();
5202
5203 let config = self.config.ack_frequency_config.as_ref().unwrap();
5205
5206 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5208 path.rtt.get(),
5209 config,
5210 &self.peer_params,
5211 );
5212
5213 trace!(?max_ack_delay, "ACK_FREQUENCY");
5214
5215 let frame = frame::AckFrequency {
5216 sequence: sequence_number,
5217 ack_eliciting_threshold: config.ack_eliciting_threshold,
5218 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5219 reordering_threshold: config.reordering_threshold,
5220 };
5221 frame.encode(buf);
5222 qlog.frame(&Frame::AckFrequency(frame));
5223
5224 sent.retransmits.get_or_create().ack_frequency = true;
5225
5226 self.ack_frequency
5227 .ack_frequency_sent(path_id, pn, max_ack_delay);
5228 self.stats.frame_tx.ack_frequency += 1;
5229 }
5230
5231 if buf.remaining_mut() > frame::PathChallenge::SIZE_BOUND
5233 && space_id == SpaceId::Data
5234 && path.send_new_challenge
5235 {
5236 path.send_new_challenge = false;
5237
5238 let token = self.rng.random();
5240 let info = paths::SentChallengeInfo {
5241 sent_instant: now,
5242 remote: path.remote,
5243 };
5244 path.challenges_sent.insert(token, info);
5245 sent.non_retransmits = true;
5246 sent.requires_padding = true;
5247 let challenge = frame::PathChallenge(token);
5248 trace!(%challenge, "sending new challenge");
5249 buf.write(challenge);
5250 qlog.frame(&Frame::PathChallenge(challenge));
5251 self.stats.frame_tx.path_challenge += 1;
5252 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5253 self.timers.set(
5254 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5255 now + pto,
5256 self.qlog.with_time(now),
5257 );
5258
5259 if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5260 space.pending.path_status.insert(path_id);
5262 }
5263
5264 if space_id == SpaceId::Data
5267 && self
5268 .config
5269 .address_discovery_role
5270 .should_report(&self.peer_params.address_discovery_role)
5271 {
5272 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5273 if buf.remaining_mut() > frame.size() {
5274 frame.write(buf);
5275 qlog.frame(&Frame::ObservedAddr(frame));
5276
5277 self.next_observed_addr_seq_no =
5278 self.next_observed_addr_seq_no.saturating_add(1u8);
5279 path.observed_addr_sent = true;
5280
5281 self.stats.frame_tx.observed_addr += 1;
5282 sent.retransmits.get_or_create().observed_addr = true;
5283 space.pending.observed_addr = false;
5284 }
5285 }
5286 }
5287
5288 if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data {
5290 if let Some(token) = path.path_responses.pop_on_path(path.remote) {
5291 sent.non_retransmits = true;
5292 sent.requires_padding = true;
5293 let response = frame::PathResponse(token);
5294 trace!(%response, "sending response");
5295 buf.write(response);
5296 qlog.frame(&Frame::PathResponse(response));
5297 self.stats.frame_tx.path_response += 1;
5298
5299 if space_id == SpaceId::Data
5303 && self
5304 .config
5305 .address_discovery_role
5306 .should_report(&self.peer_params.address_discovery_role)
5307 {
5308 let frame =
5309 frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5310 if buf.remaining_mut() > frame.size() {
5311 frame.write(buf);
5312 qlog.frame(&Frame::ObservedAddr(frame));
5313
5314 self.next_observed_addr_seq_no =
5315 self.next_observed_addr_seq_no.saturating_add(1u8);
5316 path.observed_addr_sent = true;
5317
5318 self.stats.frame_tx.observed_addr += 1;
5319 sent.retransmits.get_or_create().observed_addr = true;
5320 space.pending.observed_addr = false;
5321 }
5322 }
5323 }
5324 }
5325
5326 while !path_exclusive_only && buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt {
5328 let mut frame = match space.pending.crypto.pop_front() {
5329 Some(x) => x,
5330 None => break,
5331 };
5332
5333 let max_crypto_data_size = buf.remaining_mut()
5338 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5340 - 2; let len = frame
5343 .data
5344 .len()
5345 .min(2usize.pow(14) - 1)
5346 .min(max_crypto_data_size);
5347
5348 let data = frame.data.split_to(len);
5349 let truncated = frame::Crypto {
5350 offset: frame.offset,
5351 data,
5352 };
5353 trace!(
5354 "CRYPTO: off {} len {}",
5355 truncated.offset,
5356 truncated.data.len()
5357 );
5358 truncated.encode(buf);
5359 self.stats.frame_tx.crypto += 1;
5360
5361 #[cfg(feature = "qlog")]
5363 qlog.frame(&Frame::Crypto(truncated.clone()));
5364 sent.retransmits.get_or_create().crypto.push_back(truncated);
5365 if !frame.data.is_empty() {
5366 frame.offset += len as u64;
5367 space.pending.crypto.push_front(frame);
5368 }
5369 }
5370
5371 while !path_exclusive_only
5374 && space_id == SpaceId::Data
5375 && frame::PathAbandon::SIZE_BOUND <= buf.remaining_mut()
5376 {
5377 let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5378 break;
5379 };
5380 let frame = frame::PathAbandon {
5381 path_id,
5382 error_code,
5383 };
5384 frame.encode(buf);
5385 qlog.frame(&Frame::PathAbandon(frame));
5386 self.stats.frame_tx.path_abandon += 1;
5387 trace!(%path_id, "PATH_ABANDON");
5388 sent.retransmits
5389 .get_or_create()
5390 .path_abandon
5391 .entry(path_id)
5392 .or_insert(error_code);
5393 }
5394
5395 while !path_exclusive_only
5397 && space_id == SpaceId::Data
5398 && frame::PathStatusAvailable::SIZE_BOUND <= buf.remaining_mut()
5399 {
5400 let Some(path_id) = space.pending.path_status.pop_first() else {
5401 break;
5402 };
5403 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5404 trace!(%path_id, "discarding queued path status for unknown path");
5405 continue;
5406 };
5407
5408 let seq = path.status.seq();
5409 sent.retransmits.get_or_create().path_status.insert(path_id);
5410 match path.local_status() {
5411 PathStatus::Available => {
5412 let frame = frame::PathStatusAvailable {
5413 path_id,
5414 status_seq_no: seq,
5415 };
5416 frame.encode(buf);
5417 qlog.frame(&Frame::PathStatusAvailable(frame));
5418 self.stats.frame_tx.path_status_available += 1;
5419 trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5420 }
5421 PathStatus::Backup => {
5422 let frame = frame::PathStatusBackup {
5423 path_id,
5424 status_seq_no: seq,
5425 };
5426 frame.encode(buf);
5427 qlog.frame(&Frame::PathStatusBackup(frame));
5428 self.stats.frame_tx.path_status_backup += 1;
5429 trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5430 }
5431 }
5432 }
5433
5434 if space_id == SpaceId::Data
5436 && space.pending.max_path_id
5437 && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut()
5438 {
5439 let frame = frame::MaxPathId(self.local_max_path_id);
5440 frame.encode(buf);
5441 qlog.frame(&Frame::MaxPathId(frame));
5442 space.pending.max_path_id = false;
5443 sent.retransmits.get_or_create().max_path_id = true;
5444 trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5445 self.stats.frame_tx.max_path_id += 1;
5446 }
5447
5448 if space_id == SpaceId::Data
5450 && space.pending.paths_blocked
5451 && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut()
5452 {
5453 let frame = frame::PathsBlocked(self.remote_max_path_id);
5454 frame.encode(buf);
5455 qlog.frame(&Frame::PathsBlocked(frame));
5456 space.pending.paths_blocked = false;
5457 sent.retransmits.get_or_create().paths_blocked = true;
5458 trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED");
5459 self.stats.frame_tx.paths_blocked += 1;
5460 }
5461
5462 while space_id == SpaceId::Data && frame::PathCidsBlocked::SIZE_BOUND <= buf.remaining_mut()
5464 {
5465 let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5466 break;
5467 };
5468 let next_seq = match self.rem_cids.get(&path_id) {
5469 Some(cid_queue) => cid_queue.active_seq() + 1,
5470 None => 0,
5471 };
5472 let frame = frame::PathCidsBlocked {
5473 path_id,
5474 next_seq: VarInt(next_seq),
5475 };
5476 frame.encode(buf);
5477 qlog.frame(&Frame::PathCidsBlocked(frame));
5478 sent.retransmits
5479 .get_or_create()
5480 .path_cids_blocked
5481 .push(path_id);
5482 trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5483 self.stats.frame_tx.path_cids_blocked += 1;
5484 }
5485
5486 if space_id == SpaceId::Data {
5488 self.streams.write_control_frames(
5489 buf,
5490 &mut space.pending,
5491 &mut sent.retransmits,
5492 &mut self.stats.frame_tx,
5493 qlog,
5494 );
5495 }
5496
5497 let cid_len = self
5499 .local_cid_state
5500 .values()
5501 .map(|cid_state| cid_state.cid_len())
5502 .max()
5503 .expect("some local CID state must exist");
5504 let new_cid_size_bound =
5505 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5506 while !path_exclusive_only && buf.remaining_mut() > new_cid_size_bound {
5507 let issued = match space.pending.new_cids.pop() {
5508 Some(x) => x,
5509 None => break,
5510 };
5511 let retire_prior_to = self
5512 .local_cid_state
5513 .get(&issued.path_id)
5514 .map(|cid_state| cid_state.retire_prior_to())
5515 .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5516
5517 let cid_path_id = match is_multipath_negotiated {
5518 true => {
5519 trace!(
5520 path_id = ?issued.path_id,
5521 sequence = issued.sequence,
5522 id = %issued.id,
5523 "PATH_NEW_CONNECTION_ID",
5524 );
5525 self.stats.frame_tx.path_new_connection_id += 1;
5526 Some(issued.path_id)
5527 }
5528 false => {
5529 trace!(
5530 sequence = issued.sequence,
5531 id = %issued.id,
5532 "NEW_CONNECTION_ID"
5533 );
5534 debug_assert_eq!(issued.path_id, PathId::ZERO);
5535 self.stats.frame_tx.new_connection_id += 1;
5536 None
5537 }
5538 };
5539 let frame = frame::NewConnectionId {
5540 path_id: cid_path_id,
5541 sequence: issued.sequence,
5542 retire_prior_to,
5543 id: issued.id,
5544 reset_token: issued.reset_token,
5545 };
5546 frame.encode(buf);
5547 sent.retransmits.get_or_create().new_cids.push(issued);
5548 qlog.frame(&Frame::NewConnectionId(frame));
5549 }
5550
5551 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5553 while !path_exclusive_only && buf.remaining_mut() > retire_cid_bound {
5554 let (path_id, sequence) = match space.pending.retire_cids.pop() {
5555 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5556 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5557 self.stats.frame_tx.retire_connection_id += 1;
5558 (None, seq)
5559 }
5560 Some((path_id, seq)) => {
5561 trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5562 self.stats.frame_tx.path_retire_connection_id += 1;
5563 (Some(path_id), seq)
5564 }
5565 None => break,
5566 };
5567 let frame = frame::RetireConnectionId { path_id, sequence };
5568 frame.encode(buf);
5569 qlog.frame(&Frame::RetireConnectionId(frame));
5570 sent.retransmits
5571 .get_or_create()
5572 .retire_cids
5573 .push((path_id.unwrap_or_default(), sequence));
5574 }
5575
5576 let mut sent_datagrams = false;
5578 while !path_exclusive_only
5579 && buf.remaining_mut() > Datagram::SIZE_BOUND
5580 && space_id == SpaceId::Data
5581 {
5582 let prev_remaining = buf.remaining_mut();
5583 match self.datagrams.write(buf) {
5584 true => {
5585 sent_datagrams = true;
5586 sent.non_retransmits = true;
5587 self.stats.frame_tx.datagram += 1;
5588 qlog.frame_datagram((prev_remaining - buf.remaining_mut()) as u64);
5589 }
5590 false => break,
5591 }
5592 }
5593 if self.datagrams.send_blocked && sent_datagrams {
5594 self.events.push_back(Event::DatagramsUnblocked);
5595 self.datagrams.send_blocked = false;
5596 }
5597
5598 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5599
5600 while let Some(remote_addr) = space.pending.new_tokens.pop() {
5602 if path_exclusive_only {
5603 break;
5604 }
5605 debug_assert_eq!(space_id, SpaceId::Data);
5606 let ConnectionSide::Server { server_config } = &self.side else {
5607 panic!("NEW_TOKEN frames should not be enqueued by clients");
5608 };
5609
5610 if remote_addr != path.remote {
5611 continue;
5616 }
5617
5618 let token = Token::new(
5619 TokenPayload::Validation {
5620 ip: remote_addr.ip(),
5621 issued: server_config.time_source.now(),
5622 },
5623 &mut self.rng,
5624 );
5625 let new_token = NewToken {
5626 token: token.encode(&*server_config.token_key).into(),
5627 };
5628
5629 if buf.remaining_mut() < new_token.size() {
5630 space.pending.new_tokens.push(remote_addr);
5631 break;
5632 }
5633
5634 trace!("NEW_TOKEN");
5635 new_token.encode(buf);
5636 qlog.frame(&Frame::NewToken(new_token));
5637 sent.retransmits
5638 .get_or_create()
5639 .new_tokens
5640 .push(remote_addr);
5641 self.stats.frame_tx.new_token += 1;
5642 }
5643
5644 if !path_exclusive_only && space_id == SpaceId::Data {
5646 sent.stream_frames =
5647 self.streams
5648 .write_stream_frames(buf, self.config.send_fairness, qlog);
5649 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
5650 }
5651
5652 while space_id == SpaceId::Data && frame::AddAddress::SIZE_BOUND <= buf.remaining_mut() {
5655 if let Some(added_address) = space.pending.add_address.pop_last() {
5656 trace!(
5657 seq = %added_address.seq_no,
5658 ip = ?added_address.ip,
5659 port = added_address.port,
5660 "ADD_ADDRESS",
5661 );
5662 added_address.write(buf);
5663 sent.retransmits
5664 .get_or_create()
5665 .add_address
5666 .insert(added_address);
5667 self.stats.frame_tx.add_address = self.stats.frame_tx.add_address.saturating_add(1);
5668 qlog.frame(&Frame::AddAddress(added_address));
5669 } else {
5670 break;
5671 }
5672 }
5673
5674 while space_id == SpaceId::Data && frame::RemoveAddress::SIZE_BOUND <= buf.remaining_mut() {
5676 if let Some(removed_address) = space.pending.remove_address.pop_last() {
5677 trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5678 removed_address.write(buf);
5679 sent.retransmits
5680 .get_or_create()
5681 .remove_address
5682 .insert(removed_address);
5683 self.stats.frame_tx.remove_address =
5684 self.stats.frame_tx.remove_address.saturating_add(1);
5685 qlog.frame(&Frame::RemoveAddress(removed_address));
5686 } else {
5687 break;
5688 }
5689 }
5690
5691 sent
5692 }
5693
5694 fn populate_acks(
5696 now: Instant,
5697 receiving_ecn: bool,
5698 sent: &mut SentFrames,
5699 path_id: PathId,
5700 space_id: SpaceId,
5701 space: &mut PacketSpace,
5702 is_multipath_negotiated: bool,
5703 buf: &mut impl BufMut,
5704 stats: &mut ConnectionStats,
5705 #[allow(unused)] qlog: &mut QlogSentPacket,
5706 ) {
5707 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5709
5710 debug_assert!(
5711 is_multipath_negotiated || path_id == PathId::ZERO,
5712 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5713 );
5714 if is_multipath_negotiated {
5715 debug_assert!(
5716 space_id == SpaceId::Data || path_id == PathId::ZERO,
5717 "path acks must be sent in 1RTT space (have {space_id:?})"
5718 );
5719 }
5720
5721 let pns = space.for_path(path_id);
5722 let ranges = pns.pending_acks.ranges();
5723 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5724 let ecn = if receiving_ecn {
5725 Some(&pns.ecn_counters)
5726 } else {
5727 None
5728 };
5729 if let Some(max) = ranges.max() {
5730 sent.largest_acked.insert(path_id, max);
5731 }
5732
5733 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5734 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5736 let delay = delay_micros >> ack_delay_exp.into_inner();
5737
5738 if is_multipath_negotiated && space_id == SpaceId::Data {
5739 if !ranges.is_empty() {
5740 trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5741 frame::PathAck::encode(path_id, delay as _, ranges, ecn, buf);
5742 qlog.frame_path_ack(path_id, delay as _, ranges, ecn);
5743 stats.frame_tx.path_acks += 1;
5744 }
5745 } else {
5746 trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5747 frame::Ack::encode(delay as _, ranges, ecn, buf);
5748 stats.frame_tx.acks += 1;
5749 qlog.frame_ack(delay, ranges, ecn);
5750 }
5751 }
5752
5753 fn close_common(&mut self) {
5754 trace!("connection closed");
5755 self.timers.reset();
5756 }
5757
5758 fn set_close_timer(&mut self, now: Instant) {
5759 self.timers.set(
5762 Timer::Conn(ConnTimer::Close),
5763 now + 3 * self.pto_max_path(self.highest_space),
5764 self.qlog.with_time(now),
5765 );
5766 }
5767
5768 fn handle_peer_params(
5773 &mut self,
5774 params: TransportParameters,
5775 loc_cid: ConnectionId,
5776 rem_cid: ConnectionId,
5777 now: Instant,
5778 ) -> Result<(), TransportError> {
5779 if Some(self.orig_rem_cid) != params.initial_src_cid
5780 || (self.side.is_client()
5781 && (Some(self.initial_dst_cid) != params.original_dst_cid
5782 || self.retry_src_cid != params.retry_src_cid))
5783 {
5784 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5785 "CID authentication failure",
5786 ));
5787 }
5788 if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5789 return Err(TransportError::PROTOCOL_VIOLATION(
5790 "multipath must not use zero-length CIDs",
5791 ));
5792 }
5793
5794 self.set_peer_params(params);
5795 self.qlog.emit_peer_transport_params_received(self, now);
5796
5797 Ok(())
5798 }
5799
5800 fn set_peer_params(&mut self, params: TransportParameters) {
5801 self.streams.set_params(¶ms);
5802 self.idle_timeout =
5803 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5804 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5805
5806 if let Some(ref info) = params.preferred_address {
5807 self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5809 path_id: None,
5810 sequence: 1,
5811 id: info.connection_id,
5812 reset_token: info.stateless_reset_token,
5813 retire_prior_to: 0,
5814 })
5815 .expect(
5816 "preferred address CID is the first received, and hence is guaranteed to be legal",
5817 );
5818 let remote = self.path_data(PathId::ZERO).remote;
5819 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5820 }
5821 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
5822
5823 let mut multipath_enabled = None;
5824 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5825 self.config.get_initial_max_path_id(),
5826 params.initial_max_path_id,
5827 ) {
5828 self.local_max_path_id = local_max_path_id;
5830 self.remote_max_path_id = remote_max_path_id;
5831 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5832 debug!(%initial_max_path_id, "multipath negotiated");
5833 multipath_enabled = Some(initial_max_path_id);
5834 }
5835
5836 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5837 self.config
5838 .max_remote_nat_traversal_addresses
5839 .zip(params.max_remote_nat_traversal_addresses)
5840 {
5841 if let Some(max_initial_paths) =
5842 multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5843 {
5844 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5845 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5846 self.iroh_hp =
5847 iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5848 debug!(
5849 %max_remote_addresses, %max_local_addresses,
5850 "iroh hole punching negotiated"
5851 );
5852
5853 match self.side() {
5854 Side::Client => {
5855 if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5856 warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5859 } else if max_local_addresses as u64
5860 > params.active_connection_id_limit.into_inner()
5861 {
5862 warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5866 }
5867 }
5868 Side::Server => {
5869 if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5870 warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5871 }
5872 }
5873 }
5874 } else {
5875 debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5876 }
5877 }
5878
5879 self.peer_params = params;
5880 let peer_max_udp_payload_size =
5881 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5882 self.path_data_mut(PathId::ZERO)
5883 .mtud
5884 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5885 }
5886
5887 fn decrypt_packet(
5889 &mut self,
5890 now: Instant,
5891 path_id: PathId,
5892 packet: &mut Packet,
5893 ) -> Result<Option<u64>, Option<TransportError>> {
5894 let result = packet_crypto::decrypt_packet_body(
5895 packet,
5896 path_id,
5897 &self.spaces,
5898 self.zero_rtt_crypto.as_ref(),
5899 self.key_phase,
5900 self.prev_crypto.as_ref(),
5901 self.next_crypto.as_ref(),
5902 )?;
5903
5904 let result = match result {
5905 Some(r) => r,
5906 None => return Ok(None),
5907 };
5908
5909 if result.outgoing_key_update_acked {
5910 if let Some(prev) = self.prev_crypto.as_mut() {
5911 prev.end_packet = Some((result.number, now));
5912 self.set_key_discard_timer(now, packet.header.space());
5913 }
5914 }
5915
5916 if result.incoming_key_update {
5917 trace!("key update authenticated");
5918 self.update_keys(Some((result.number, now)), true);
5919 self.set_key_discard_timer(now, packet.header.space());
5920 }
5921
5922 Ok(Some(result.number))
5923 }
5924
5925 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5926 trace!("executing key update");
5927 let new = self
5931 .crypto
5932 .next_1rtt_keys()
5933 .expect("only called for `Data` packets");
5934 self.key_phase_size = new
5935 .local
5936 .confidentiality_limit()
5937 .saturating_sub(KEY_UPDATE_MARGIN);
5938 let old = mem::replace(
5939 &mut self.spaces[SpaceId::Data]
5940 .crypto
5941 .as_mut()
5942 .unwrap() .packet,
5944 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5945 );
5946 self.spaces[SpaceId::Data]
5947 .iter_paths_mut()
5948 .for_each(|s| s.sent_with_keys = 0);
5949 self.prev_crypto = Some(PrevCrypto {
5950 crypto: old,
5951 end_packet,
5952 update_unacked: remote,
5953 });
5954 self.key_phase = !self.key_phase;
5955 }
5956
5957 fn peer_supports_ack_frequency(&self) -> bool {
5958 self.peer_params.min_ack_delay.is_some()
5959 }
5960
5961 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5966 debug_assert_eq!(
5967 self.highest_space,
5968 SpaceId::Data,
5969 "immediate ack must be written in the data space"
5970 );
5971 self.spaces[self.highest_space]
5972 .for_path(path_id)
5973 .immediate_ack_pending = true;
5974 }
5975
5976 #[cfg(test)]
5978 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5979 let (path_id, first_decode, remaining) = match &event.0 {
5980 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5981 path_id,
5982 first_decode,
5983 remaining,
5984 ..
5985 }) => (path_id, first_decode, remaining),
5986 _ => return None,
5987 };
5988
5989 if remaining.is_some() {
5990 panic!("Packets should never be coalesced in tests");
5991 }
5992
5993 let decrypted_header = packet_crypto::unprotect_header(
5994 first_decode.clone(),
5995 &self.spaces,
5996 self.zero_rtt_crypto.as_ref(),
5997 self.peer_params.stateless_reset_token,
5998 )?;
5999
6000 let mut packet = decrypted_header.packet?;
6001 packet_crypto::decrypt_packet_body(
6002 &mut packet,
6003 *path_id,
6004 &self.spaces,
6005 self.zero_rtt_crypto.as_ref(),
6006 self.key_phase,
6007 self.prev_crypto.as_ref(),
6008 self.next_crypto.as_ref(),
6009 )
6010 .ok()?;
6011
6012 Some(packet.payload.to_vec())
6013 }
6014
6015 #[cfg(test)]
6018 pub(crate) fn bytes_in_flight(&self) -> u64 {
6019 self.path_data(PathId::ZERO).in_flight.bytes
6021 }
6022
6023 #[cfg(test)]
6025 pub(crate) fn congestion_window(&self) -> u64 {
6026 let path = self.path_data(PathId::ZERO);
6027 path.congestion
6028 .window()
6029 .saturating_sub(path.in_flight.bytes)
6030 }
6031
6032 #[cfg(test)]
6034 pub(crate) fn is_idle(&self) -> bool {
6035 let current_timers = self.timers.values();
6036 current_timers
6037 .into_iter()
6038 .filter(|(timer, _)| {
6039 !matches!(
6040 timer,
6041 Timer::Conn(ConnTimer::KeepAlive)
6042 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6043 | Timer::Conn(ConnTimer::PushNewCid)
6044 | Timer::Conn(ConnTimer::KeyDiscard)
6045 )
6046 })
6047 .min_by_key(|(_, time)| *time)
6048 .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6049 }
6050
6051 #[cfg(test)]
6053 pub(crate) fn using_ecn(&self) -> bool {
6054 self.path_data(PathId::ZERO).sending_ecn
6055 }
6056
6057 #[cfg(test)]
6059 pub(crate) fn total_recvd(&self) -> u64 {
6060 self.path_data(PathId::ZERO).total_recvd
6061 }
6062
6063 #[cfg(test)]
6064 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6065 self.local_cid_state
6066 .get(&PathId::ZERO)
6067 .unwrap()
6068 .active_seq()
6069 }
6070
6071 #[cfg(test)]
6072 #[track_caller]
6073 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6074 self.local_cid_state
6075 .get(&PathId(path_id))
6076 .unwrap()
6077 .active_seq()
6078 }
6079
6080 #[cfg(test)]
6083 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6084 let n = self
6085 .local_cid_state
6086 .get_mut(&PathId::ZERO)
6087 .unwrap()
6088 .assign_retire_seq(v);
6089 self.endpoint_events
6090 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6091 }
6092
6093 #[cfg(test)]
6095 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
6096 self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
6097 }
6098
6099 #[cfg(test)]
6101 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6102 self.path_data(path_id).current_mtu()
6103 }
6104
6105 #[cfg(test)]
6107 pub(crate) fn trigger_path_validation(&mut self) {
6108 for path in self.paths.values_mut() {
6109 path.data.send_new_challenge = true;
6110 }
6111 }
6112
6113 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6124 let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
6125 path.data.send_new_challenge
6126 || path
6127 .prev
6128 .as_ref()
6129 .is_some_and(|(_, path)| path.send_new_challenge)
6130 || !path.data.path_responses.is_empty()
6131 });
6132 let other = self.streams.can_send_stream_data()
6133 || self
6134 .datagrams
6135 .outgoing
6136 .front()
6137 .is_some_and(|x| x.size(true) <= max_size);
6138 SendableFrames {
6139 acks: false,
6140 other,
6141 close: false,
6142 path_exclusive,
6143 }
6144 }
6145
6146 fn kill(&mut self, reason: ConnectionError) {
6148 self.close_common();
6149 self.state.move_to_drained(Some(reason));
6150 self.endpoint_events.push_back(EndpointEventInner::Drained);
6151 }
6152
6153 pub fn current_mtu(&self) -> u16 {
6160 self.paths
6161 .iter()
6162 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6163 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6164 .min()
6165 .expect("There is always at least one available path")
6166 }
6167
6168 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6175 let pn_len = PacketNumber::new(
6176 pn,
6177 self.spaces[SpaceId::Data]
6178 .for_path(path)
6179 .largest_acked_packet
6180 .unwrap_or(0),
6181 )
6182 .len();
6183
6184 1 + self
6186 .rem_cids
6187 .get(&path)
6188 .map(|cids| cids.active().len())
6189 .unwrap_or(20) + pn_len
6191 + self.tag_len_1rtt()
6192 }
6193
6194 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6195 let pn_len = 4;
6196
6197 let cid_len = self
6198 .rem_cids
6199 .values()
6200 .map(|cids| cids.active().len())
6201 .max()
6202 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
6206 }
6207
6208 fn tag_len_1rtt(&self) -> usize {
6209 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6210 Some(crypto) => Some(&*crypto.packet.local),
6211 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6212 };
6213 key.map_or(16, |x| x.tag_len())
6217 }
6218
6219 fn on_path_validated(&mut self, path_id: PathId) {
6221 self.path_data_mut(path_id).validated = true;
6222 let ConnectionSide::Server { server_config } = &self.side else {
6223 return;
6224 };
6225 let remote_addr = self.path_data(path_id).remote;
6226 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6227 new_tokens.clear();
6228 for _ in 0..server_config.validation_token.sent {
6229 new_tokens.push(remote_addr);
6230 }
6231 }
6232
6233 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6235 if let Some(path) = self.paths.get_mut(&path_id) {
6236 path.data.status.remote_update(status, status_seq_no);
6237 } else {
6238 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6239 }
6240 self.events.push_back(
6241 PathEvent::RemoteStatus {
6242 id: path_id,
6243 status,
6244 }
6245 .into(),
6246 );
6247 }
6248
6249 fn max_path_id(&self) -> Option<PathId> {
6258 if self.is_multipath_negotiated() {
6259 Some(self.remote_max_path_id.min(self.local_max_path_id))
6260 } else {
6261 None
6262 }
6263 }
6264
6265 pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6267 if let Some(added) = self.iroh_hp.add_local_address(address)? {
6268 self.spaces[SpaceId::Data].pending.add_address.insert(added);
6269 };
6270 Ok(())
6271 }
6272
6273 pub fn remove_nat_traversal_address(
6277 &mut self,
6278 address: SocketAddr,
6279 ) -> Result<(), iroh_hp::Error> {
6280 if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6281 self.spaces[SpaceId::Data]
6282 .pending
6283 .remove_address
6284 .insert(removed);
6285 }
6286 Ok(())
6287 }
6288
6289 pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6291 self.iroh_hp.get_local_nat_traversal_addresses()
6292 }
6293
6294 pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6296 Ok(self
6297 .iroh_hp
6298 .client_side()?
6299 .get_remote_nat_traversal_addresses())
6300 }
6301
6302 pub fn initiate_nat_traversal_round(
6310 &mut self,
6311 now: Instant,
6312 ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6313 let client_state = self.iroh_hp.client_side_mut()?;
6314 let iroh_hp::NatTraversalRound {
6315 new_round,
6316 reach_out_at,
6317 addresses_to_probe,
6318 prev_round_path_ids,
6319 } = client_state.initiate_nat_traversal_round()?;
6320
6321 self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6322
6323 for path_id in prev_round_path_ids {
6324 let validated = self
6327 .path(path_id)
6328 .map(|path| path.validated)
6329 .unwrap_or(false);
6330
6331 if !validated {
6332 let _ = self.close_path(
6333 now,
6334 path_id,
6335 TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6336 );
6337 }
6338 }
6339
6340 let mut err = None;
6341
6342 let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6343 let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6344 let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6345
6346 for (ip, port) in addresses_to_probe {
6347 let remote = match ip {
6349 IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6350 IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6351 IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6352 IpAddr::V6(_) => {
6353 trace!("not using IPv6 nat candidate for IPv4 socket");
6354 continue;
6355 }
6356 };
6357 match self.open_path_ensure(remote, PathStatus::Backup, now) {
6358 Ok((path_id, path_was_known)) if !path_was_known => {
6359 path_ids.push(path_id);
6360 probed_addresses.push(remote);
6361 }
6362 Ok((path_id, _)) => {
6363 trace!(%path_id, %remote,"nat traversal: path existed for remote")
6364 }
6365 Err(e) => {
6366 debug!(%remote, %e,"nat traversal: failed to probe remote");
6367 err.get_or_insert(e);
6368 }
6369 }
6370 }
6371
6372 if let Some(err) = err {
6373 if probed_addresses.is_empty() {
6375 return Err(iroh_hp::Error::Multipath(err));
6376 }
6377 }
6378
6379 self.iroh_hp
6380 .client_side_mut()
6381 .expect("connection side validated")
6382 .set_round_path_ids(path_ids);
6383
6384 Ok(probed_addresses)
6385 }
6386}
6387
6388impl fmt::Debug for Connection {
6389 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6390 f.debug_struct("Connection")
6391 .field("handshake_cid", &self.handshake_cid)
6392 .finish()
6393 }
6394}
6395
6396#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6397enum PathBlocked {
6398 No,
6399 AntiAmplification,
6400 Congestion,
6401 Pacing,
6402}
6403
6404enum ConnectionSide {
6406 Client {
6407 token: Bytes,
6409 token_store: Arc<dyn TokenStore>,
6410 server_name: String,
6411 },
6412 Server {
6413 server_config: Arc<ServerConfig>,
6414 },
6415}
6416
6417impl ConnectionSide {
6418 fn remote_may_migrate(&self, state: &State) -> bool {
6419 match self {
6420 Self::Server { server_config } => server_config.migration,
6421 Self::Client { .. } => {
6422 if let Some(hs) = state.as_handshake() {
6423 hs.allow_server_migration
6424 } else {
6425 false
6426 }
6427 }
6428 }
6429 }
6430
6431 fn is_client(&self) -> bool {
6432 self.side().is_client()
6433 }
6434
6435 fn is_server(&self) -> bool {
6436 self.side().is_server()
6437 }
6438
6439 fn side(&self) -> Side {
6440 match *self {
6441 Self::Client { .. } => Side::Client,
6442 Self::Server { .. } => Side::Server,
6443 }
6444 }
6445}
6446
6447impl From<SideArgs> for ConnectionSide {
6448 fn from(side: SideArgs) -> Self {
6449 match side {
6450 SideArgs::Client {
6451 token_store,
6452 server_name,
6453 } => Self::Client {
6454 token: token_store.take(&server_name).unwrap_or_default(),
6455 token_store,
6456 server_name,
6457 },
6458 SideArgs::Server {
6459 server_config,
6460 pref_addr_cid: _,
6461 path_validated: _,
6462 } => Self::Server { server_config },
6463 }
6464 }
6465}
6466
6467pub(crate) enum SideArgs {
6469 Client {
6470 token_store: Arc<dyn TokenStore>,
6471 server_name: String,
6472 },
6473 Server {
6474 server_config: Arc<ServerConfig>,
6475 pref_addr_cid: Option<ConnectionId>,
6476 path_validated: bool,
6477 },
6478}
6479
6480impl SideArgs {
6481 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6482 match *self {
6483 Self::Client { .. } => None,
6484 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6485 }
6486 }
6487
6488 pub(crate) fn path_validated(&self) -> bool {
6489 match *self {
6490 Self::Client { .. } => true,
6491 Self::Server { path_validated, .. } => path_validated,
6492 }
6493 }
6494
6495 pub(crate) fn side(&self) -> Side {
6496 match *self {
6497 Self::Client { .. } => Side::Client,
6498 Self::Server { .. } => Side::Server,
6499 }
6500 }
6501}
6502
6503#[derive(Debug, Error, Clone, PartialEq, Eq)]
6505pub enum ConnectionError {
6506 #[error("peer doesn't implement any supported version")]
6508 VersionMismatch,
6509 #[error(transparent)]
6511 TransportError(#[from] TransportError),
6512 #[error("aborted by peer: {0}")]
6514 ConnectionClosed(frame::ConnectionClose),
6515 #[error("closed by peer: {0}")]
6517 ApplicationClosed(frame::ApplicationClose),
6518 #[error("reset by peer")]
6520 Reset,
6521 #[error("timed out")]
6527 TimedOut,
6528 #[error("closed")]
6530 LocallyClosed,
6531 #[error("CIDs exhausted")]
6535 CidsExhausted,
6536}
6537
6538impl From<Close> for ConnectionError {
6539 fn from(x: Close) -> Self {
6540 match x {
6541 Close::Connection(reason) => Self::ConnectionClosed(reason),
6542 Close::Application(reason) => Self::ApplicationClosed(reason),
6543 }
6544 }
6545}
6546
6547impl From<ConnectionError> for io::Error {
6549 fn from(x: ConnectionError) -> Self {
6550 use ConnectionError::*;
6551 let kind = match x {
6552 TimedOut => io::ErrorKind::TimedOut,
6553 Reset => io::ErrorKind::ConnectionReset,
6554 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6555 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6556 io::ErrorKind::Other
6557 }
6558 };
6559 Self::new(kind, x)
6560 }
6561}
6562
6563#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6566pub enum PathError {
6567 #[error("multipath extension not negotiated")]
6569 MultipathNotNegotiated,
6570 #[error("the server side may not open a path")]
6572 ServerSideNotAllowed,
6573 #[error("maximum number of concurrent paths reached")]
6575 MaxPathIdReached,
6576 #[error("remoted CIDs exhausted")]
6578 RemoteCidsExhausted,
6579 #[error("path validation failed")]
6581 ValidationFailed,
6582 #[error("invalid remote address")]
6584 InvalidRemoteAddress(SocketAddr),
6585}
6586
6587#[derive(Debug, Error, Clone, Eq, PartialEq)]
6589pub enum ClosePathError {
6590 #[error("closed path")]
6592 ClosedPath,
6593 #[error("last open path")]
6595 LastOpenPath,
6596}
6597
6598#[derive(Debug, Error, Clone, Copy)]
6599#[error("Multipath extension not negotiated")]
6600pub struct MultipathNotNegotiated {
6601 _private: (),
6602}
6603
6604#[derive(Debug)]
6606pub enum Event {
6607 HandshakeDataReady,
6609 Connected,
6611 HandshakeConfirmed,
6613 ConnectionLost {
6617 reason: ConnectionError,
6619 },
6620 Stream(StreamEvent),
6622 DatagramReceived,
6624 DatagramsUnblocked,
6626 Path(PathEvent),
6628 NatTraversal(iroh_hp::Event),
6630}
6631
6632impl From<PathEvent> for Event {
6633 fn from(source: PathEvent) -> Self {
6634 Self::Path(source)
6635 }
6636}
6637
6638fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6639 Duration::from_micros(params.max_ack_delay.0 * 1000)
6640}
6641
6642const MAX_BACKOFF_EXPONENT: u32 = 16;
6644
6645const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6653
6654const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6660 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6661
6662const KEY_UPDATE_MARGIN: u64 = 10_000;
6666
6667#[derive(Default)]
6668struct SentFrames {
6669 retransmits: ThinRetransmits,
6670 largest_acked: FxHashMap<PathId, u64>,
6672 stream_frames: StreamMetaVec,
6673 non_retransmits: bool,
6675 requires_padding: bool,
6677}
6678
6679impl SentFrames {
6680 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6682 !self.largest_acked.is_empty()
6683 && !self.non_retransmits
6684 && self.stream_frames.is_empty()
6685 && self.retransmits.is_empty(streams)
6686 }
6687}
6688
6689fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6697 match (x, y) {
6698 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6699 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6700 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6701 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6702 }
6703}
6704
6705#[cfg(test)]
6706mod tests {
6707 use super::*;
6708
6709 #[test]
6710 fn negotiate_max_idle_timeout_commutative() {
6711 let test_params = [
6712 (None, None, None),
6713 (None, Some(VarInt(0)), None),
6714 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6715 (Some(VarInt(0)), Some(VarInt(0)), None),
6716 (
6717 Some(VarInt(2)),
6718 Some(VarInt(0)),
6719 Some(Duration::from_millis(2)),
6720 ),
6721 (
6722 Some(VarInt(1)),
6723 Some(VarInt(4)),
6724 Some(Duration::from_millis(1)),
6725 ),
6726 ];
6727
6728 for (left, right, result) in test_params {
6729 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6730 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6731 }
6732 }
6733}