1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 num::{NonZeroU32, NonZeroUsize},
8 ops::Not,
9 sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21 Dir, Duration, EndpointConfig, FourTuple, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE,
22 MAX_STREAM_COUNT, MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit,
23 TransportError, TransportErrorCode, VarInt,
24 cid_generator::ConnectionIdGenerator,
25 cid_queue::CidQueue,
26 config::{ServerConfig, TransportConfig},
27 congestion::Controller,
28 connection::{
29 qlog::{QlogRecvPacket, QlogSink},
30 spaces::LostPacket,
31 timer::{ConnTimer, PathTimer},
32 },
33 crypto::{self, KeyPair, Keys, PacketKey},
34 frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
35 iroh_hp,
36 packet::{
37 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
38 PacketNumber, PartialDecode, SpaceId,
39 },
40 range_set::ArrayRangeSet,
41 shared::{
42 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
43 EndpointEvent, EndpointEventInner,
44 },
45 token::{ResetToken, Token, TokenPayload},
46 transport_parameters::TransportParameters,
47};
48
49mod ack_frequency;
50use ack_frequency::AckFrequencyState;
51
52mod assembler;
53pub use assembler::Chunk;
54
55mod cid_state;
56use cid_state::CidState;
57
58mod datagrams;
59use datagrams::DatagramState;
60pub use datagrams::{Datagrams, SendDatagramError};
61
62mod mtud;
63mod pacing;
64
65mod packet_builder;
66use packet_builder::{PacketBuilder, PadDatagram};
67
68mod packet_crypto;
69use packet_crypto::{PrevCrypto, ZeroRttCrypto};
70
71mod paths;
72pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
73use paths::{PathData, PathState};
74
75pub(crate) mod qlog;
76pub(crate) mod send_buffer;
77
78mod spaces;
79#[cfg(fuzzing)]
80pub use spaces::Retransmits;
81#[cfg(not(fuzzing))]
82use spaces::Retransmits;
83use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
84
85mod stats;
86pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
87
88mod streams;
89#[cfg(fuzzing)]
90pub use streams::StreamsState;
91#[cfg(not(fuzzing))]
92use streams::StreamsState;
93pub use streams::{
94 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
95 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
96};
97
98mod timer;
99use timer::{Timer, TimerTable};
100
101mod transmit_buf;
102use transmit_buf::TransmitBuf;
103
104mod state;
105
106#[cfg(not(fuzzing))]
107use state::State;
108#[cfg(fuzzing)]
109pub use state::State;
110use state::StateType;
111
112pub struct Connection {
152 endpoint_config: Arc<EndpointConfig>,
153 config: Arc<TransportConfig>,
154 rng: StdRng,
155 crypto: Box<dyn crypto::Session>,
156 handshake_cid: ConnectionId,
158 rem_handshake_cid: ConnectionId,
160 paths: BTreeMap<PathId, PathState>,
166 path_generation_counter: u64,
177 allow_mtud: bool,
179 state: State,
180 side: ConnectionSide,
181 zero_rtt_enabled: bool,
183 zero_rtt_crypto: Option<ZeroRttCrypto>,
185 key_phase: bool,
186 key_phase_size: u64,
188 peer_params: TransportParameters,
190 orig_rem_cid: ConnectionId,
192 initial_dst_cid: ConnectionId,
194 retry_src_cid: Option<ConnectionId>,
197 events: VecDeque<Event>,
199 endpoint_events: VecDeque<EndpointEventInner>,
200 spin_enabled: bool,
202 spin: bool,
204 spaces: [PacketSpace; 3],
206 highest_space: SpaceId,
208 prev_crypto: Option<PrevCrypto>,
210 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
215 accepted_0rtt: bool,
216 permit_idle_reset: bool,
218 idle_timeout: Option<Duration>,
220 timers: TimerTable,
221 authentication_failures: u64,
223
224 close: bool,
229
230 ack_frequency: AckFrequencyState,
234
235 receiving_ecn: bool,
240 total_authed_packets: u64,
242 app_limited: bool,
245
246 next_observed_addr_seq_no: VarInt,
251
252 streams: StreamsState,
253 rem_cids: FxHashMap<PathId, CidQueue>,
259 local_cid_state: FxHashMap<PathId, CidState>,
266 datagrams: DatagramState,
268 stats: ConnectionStats,
270 path_stats: FxHashMap<PathId, PathStats>,
272 version: u32,
274
275 max_concurrent_paths: NonZeroU32,
284 local_max_path_id: PathId,
299 remote_max_path_id: PathId,
305 max_path_id_with_cids: PathId,
311 abandoned_paths: FxHashSet<PathId>,
319
320 iroh_hp: iroh_hp::State,
321 qlog: QlogSink,
322}
323
324impl Connection {
325 pub(crate) fn new(
326 endpoint_config: Arc<EndpointConfig>,
327 config: Arc<TransportConfig>,
328 init_cid: ConnectionId,
329 loc_cid: ConnectionId,
330 rem_cid: ConnectionId,
331 network_path: FourTuple,
332 crypto: Box<dyn crypto::Session>,
333 cid_gen: &dyn ConnectionIdGenerator,
334 now: Instant,
335 version: u32,
336 allow_mtud: bool,
337 rng_seed: [u8; 32],
338 side_args: SideArgs,
339 qlog: QlogSink,
340 ) -> Self {
341 let pref_addr_cid = side_args.pref_addr_cid();
342 let path_validated = side_args.path_validated();
343 let connection_side = ConnectionSide::from(side_args);
344 let side = connection_side.side();
345 let mut rng = StdRng::from_seed(rng_seed);
346 let initial_space = {
347 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
348 space.crypto = Some(crypto.initial_keys(init_cid, side));
349 space
350 };
351 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
352 #[cfg(test)]
353 let data_space = match config.deterministic_packet_numbers {
354 true => PacketSpace::new_deterministic(now, SpaceId::Data),
355 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
356 };
357 #[cfg(not(test))]
358 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
359 let state = State::handshake(state::Handshake {
360 rem_cid_set: side.is_server(),
361 expected_token: Bytes::new(),
362 client_hello: None,
363 allow_server_migration: side.is_client(),
364 });
365 let local_cid_state = FxHashMap::from_iter([(
366 PathId::ZERO,
367 CidState::new(
368 cid_gen.cid_len(),
369 cid_gen.cid_lifetime(),
370 now,
371 if pref_addr_cid.is_some() { 2 } else { 1 },
372 ),
373 )]);
374
375 let mut path = PathData::new(network_path, allow_mtud, None, 0, now, &config);
376 path.open = true;
378 let mut this = Self {
379 endpoint_config,
380 crypto,
381 handshake_cid: loc_cid,
382 rem_handshake_cid: rem_cid,
383 local_cid_state,
384 paths: BTreeMap::from_iter([(
385 PathId::ZERO,
386 PathState {
387 data: path,
388 prev: None,
389 },
390 )]),
391 path_generation_counter: 0,
392 allow_mtud,
393 state,
394 side: connection_side,
395 zero_rtt_enabled: false,
396 zero_rtt_crypto: None,
397 key_phase: false,
398 key_phase_size: rng.random_range(10..1000),
405 peer_params: TransportParameters::default(),
406 orig_rem_cid: rem_cid,
407 initial_dst_cid: init_cid,
408 retry_src_cid: None,
409 events: VecDeque::new(),
410 endpoint_events: VecDeque::new(),
411 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
412 spin: false,
413 spaces: [initial_space, handshake_space, data_space],
414 highest_space: SpaceId::Initial,
415 prev_crypto: None,
416 next_crypto: None,
417 accepted_0rtt: false,
418 permit_idle_reset: true,
419 idle_timeout: match config.max_idle_timeout {
420 None | Some(VarInt(0)) => None,
421 Some(dur) => Some(Duration::from_millis(dur.0)),
422 },
423 timers: TimerTable::default(),
424 authentication_failures: 0,
425 close: false,
426
427 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
428 &TransportParameters::default(),
429 )),
430
431 app_limited: false,
432 receiving_ecn: false,
433 total_authed_packets: 0,
434
435 next_observed_addr_seq_no: 0u32.into(),
436
437 streams: StreamsState::new(
438 side,
439 config.max_concurrent_uni_streams,
440 config.max_concurrent_bidi_streams,
441 config.send_window,
442 config.receive_window,
443 config.stream_receive_window,
444 ),
445 datagrams: DatagramState::default(),
446 config,
447 rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
448 rng,
449 stats: ConnectionStats::default(),
450 path_stats: Default::default(),
451 version,
452
453 max_concurrent_paths: NonZeroU32::MIN,
455 local_max_path_id: PathId::ZERO,
456 remote_max_path_id: PathId::ZERO,
457 max_path_id_with_cids: PathId::ZERO,
458 abandoned_paths: Default::default(),
459
460 iroh_hp: Default::default(),
462 qlog,
463 };
464 if path_validated {
465 this.on_path_validated(PathId::ZERO);
466 }
467 if side.is_client() {
468 this.write_crypto();
470 this.init_0rtt(now);
471 }
472 this.qlog
473 .emit_tuple_assigned(PathId::ZERO, network_path, now);
474 this
475 }
476
477 #[must_use]
485 pub fn poll_timeout(&mut self) -> Option<Instant> {
486 self.timers.peek()
487 }
488
489 #[must_use]
495 pub fn poll(&mut self) -> Option<Event> {
496 if let Some(x) = self.events.pop_front() {
497 return Some(x);
498 }
499
500 if let Some(event) = self.streams.poll() {
501 return Some(Event::Stream(event));
502 }
503
504 if let Some(reason) = self.state.take_error() {
505 return Some(Event::ConnectionLost { reason });
506 }
507
508 None
509 }
510
511 #[must_use]
513 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
514 self.endpoint_events.pop_front().map(EndpointEvent)
515 }
516
517 #[must_use]
519 pub fn streams(&mut self) -> Streams<'_> {
520 Streams {
521 state: &mut self.streams,
522 conn_state: &self.state,
523 }
524 }
525
526 #[must_use]
528 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
529 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
530 RecvStream {
531 id,
532 state: &mut self.streams,
533 pending: &mut self.spaces[SpaceId::Data].pending,
534 }
535 }
536
537 #[must_use]
539 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
540 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
541 SendStream {
542 id,
543 state: &mut self.streams,
544 pending: &mut self.spaces[SpaceId::Data].pending,
545 conn_state: &self.state,
546 }
547 }
548
549 pub fn open_path_ensure(
566 &mut self,
567 network_path: FourTuple,
568 initial_status: PathStatus,
569 now: Instant,
570 ) -> Result<(PathId, bool), PathError> {
571 Ok(
572 match self
573 .paths
574 .iter()
575 .find(|(_id, path)| network_path.is_probably_same_path(&path.data.network_path))
576 {
577 Some((path_id, _state)) => (*path_id, true),
578 None => (self.open_path(network_path, initial_status, now)?, false),
579 },
580 )
581 }
582
583 pub fn open_path(
588 &mut self,
589 network_path: FourTuple,
590 initial_status: PathStatus,
591 now: Instant,
592 ) -> Result<PathId, PathError> {
593 if !self.is_multipath_negotiated() {
594 return Err(PathError::MultipathNotNegotiated);
595 }
596 if self.side().is_server() {
597 return Err(PathError::ServerSideNotAllowed);
598 }
599
600 let max_abandoned = self.abandoned_paths.iter().max().copied();
601 let max_used = self.paths.keys().last().copied();
602 let path_id = max_abandoned
603 .max(max_used)
604 .unwrap_or(PathId::ZERO)
605 .saturating_add(1u8);
606
607 if Some(path_id) > self.max_path_id() {
608 return Err(PathError::MaxPathIdReached);
609 }
610 if path_id > self.remote_max_path_id {
611 self.spaces[SpaceId::Data].pending.paths_blocked = true;
612 return Err(PathError::MaxPathIdReached);
613 }
614 if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
615 self.spaces[SpaceId::Data]
616 .pending
617 .path_cids_blocked
618 .insert(path_id);
619 return Err(PathError::RemoteCidsExhausted);
620 }
621
622 let path = self.ensure_path(path_id, network_path, now, None);
623 path.status.local_update(initial_status);
624
625 Ok(path_id)
626 }
627
628 pub fn close_path(
634 &mut self,
635 now: Instant,
636 path_id: PathId,
637 error_code: VarInt,
638 ) -> Result<(), ClosePathError> {
639 if self.abandoned_paths.contains(&path_id)
640 || Some(path_id) > self.max_path_id()
641 || !self.paths.contains_key(&path_id)
642 {
643 return Err(ClosePathError::ClosedPath);
644 }
645 if self
646 .paths
647 .iter()
648 .any(|(id, path)| {
650 *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
651 })
652 .not()
653 {
654 return Err(ClosePathError::LastOpenPath);
655 }
656
657 self.spaces[SpaceId::Data]
659 .pending
660 .path_abandon
661 .insert(path_id, error_code.into());
662
663 let pending_space = &mut self.spaces[SpaceId::Data].pending;
665 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
666 pending_space.path_cids_blocked.retain(|&id| id != path_id);
667 pending_space.path_status.retain(|&id| id != path_id);
668
669 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
671 for sent_packet in space.sent_packets.values_mut() {
672 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
673 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
674 retransmits.path_cids_blocked.retain(|&id| id != path_id);
675 retransmits.path_status.retain(|&id| id != path_id);
676 }
677 }
678 }
679
680 self.rem_cids.remove(&path_id);
686 self.endpoint_events
687 .push_back(EndpointEventInner::RetireResetToken(path_id));
688
689 let pto = self.pto_max_path(SpaceId::Data, false);
690
691 let path = self.paths.get_mut(&path_id).expect("checked above");
692
693 path.data.last_allowed_receive = Some(now + 3 * pto);
695 self.abandoned_paths.insert(path_id);
696
697 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
698
699 self.timers.set(
704 Timer::PerPath(path_id, PathTimer::DiscardPath),
705 now + 6 * pto,
706 self.qlog.with_time(now),
707 );
708 Ok(())
709 }
710
711 #[track_caller]
715 fn path_data(&self, path_id: PathId) -> &PathData {
716 if let Some(data) = self.paths.get(&path_id) {
717 &data.data
718 } else {
719 panic!(
720 "unknown path: {path_id}, currently known paths: {:?}",
721 self.paths.keys().collect::<Vec<_>>()
722 );
723 }
724 }
725
726 fn path(&self, path_id: PathId) -> Option<&PathData> {
728 self.paths.get(&path_id).map(|path_state| &path_state.data)
729 }
730
731 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
733 self.paths
734 .get_mut(&path_id)
735 .map(|path_state| &mut path_state.data)
736 }
737
738 pub fn paths(&self) -> Vec<PathId> {
742 self.paths.keys().copied().collect()
743 }
744
745 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
747 self.path(path_id)
748 .map(PathData::local_status)
749 .ok_or(ClosedPath { _private: () })
750 }
751
752 pub fn network_path(&self, path_id: PathId) -> Result<FourTuple, ClosedPath> {
754 self.path(path_id)
755 .map(|path| path.network_path)
756 .ok_or(ClosedPath { _private: () })
757 }
758
759 pub fn set_path_status(
763 &mut self,
764 path_id: PathId,
765 status: PathStatus,
766 ) -> Result<PathStatus, SetPathStatusError> {
767 if !self.is_multipath_negotiated() {
768 return Err(SetPathStatusError::MultipathNotNegotiated);
769 }
770 let path = self
771 .path_mut(path_id)
772 .ok_or(SetPathStatusError::ClosedPath)?;
773 let prev = match path.status.local_update(status) {
774 Some(prev) => {
775 self.spaces[SpaceId::Data]
776 .pending
777 .path_status
778 .insert(path_id);
779 prev
780 }
781 None => path.local_status(),
782 };
783 Ok(prev)
784 }
785
786 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
791 self.path(path_id).and_then(|path| path.remote_status())
792 }
793
794 pub fn set_path_max_idle_timeout(
800 &mut self,
801 path_id: PathId,
802 timeout: Option<Duration>,
803 ) -> Result<Option<Duration>, ClosedPath> {
804 let path = self
805 .paths
806 .get_mut(&path_id)
807 .ok_or(ClosedPath { _private: () })?;
808 Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
809 }
810
811 pub fn set_path_keep_alive_interval(
817 &mut self,
818 path_id: PathId,
819 interval: Option<Duration>,
820 ) -> Result<Option<Duration>, ClosedPath> {
821 let path = self
822 .paths
823 .get_mut(&path_id)
824 .ok_or(ClosedPath { _private: () })?;
825 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
826 }
827
828 #[track_caller]
832 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
833 &mut self.paths.get_mut(&path_id).expect("known path").data
834 }
835
836 fn find_validated_path_on_network_path(
840 &self,
841 network_path: FourTuple,
842 ) -> Option<(&PathId, &PathState)> {
843 self.paths.iter().find(|(path_id, path_state)| {
844 path_state.data.validated
845 && network_path.is_probably_same_path(&path_state.data.network_path)
847 && !self.abandoned_paths.contains(path_id)
848 })
849 }
853
854 fn ensure_path(
855 &mut self,
856 path_id: PathId,
857 network_path: FourTuple,
858 now: Instant,
859 pn: Option<u64>,
860 ) -> &mut PathData {
861 let valid_path = self.find_validated_path_on_network_path(network_path);
862 let validated = valid_path.is_some();
863 let initial_rtt = valid_path.map(|(_, path)| path.data.rtt.conservative());
864 let vacant_entry = match self.paths.entry(path_id) {
865 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
866 btree_map::Entry::Occupied(occupied_entry) => {
867 return &mut occupied_entry.into_mut().data;
868 }
869 };
870
871 debug!(%validated, %path_id, %network_path, "path added");
872 let peer_max_udp_payload_size =
873 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
874 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
875 let mut data = PathData::new(
876 network_path,
877 self.allow_mtud,
878 Some(peer_max_udp_payload_size),
879 self.path_generation_counter,
880 now,
881 &self.config,
882 );
883
884 data.validated = validated;
885 if let Some(initial_rtt) = initial_rtt {
886 data.rtt.reset_initial_rtt(initial_rtt);
887 }
888
889 let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
890 self.timers.set(
891 Timer::PerPath(path_id, PathTimer::PathOpen),
892 now + 3 * pto,
893 self.qlog.with_time(now),
894 );
895
896 data.send_new_challenge = true;
899
900 let path = vacant_entry.insert(PathState { data, prev: None });
901
902 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
903 if let Some(pn) = pn {
904 pn_space.dedup.insert(pn);
905 }
906 self.spaces[SpaceId::Data]
907 .number_spaces
908 .insert(path_id, pn_space);
909 self.qlog.emit_tuple_assigned(path_id, network_path, now);
910 &mut path.data
911 }
912
913 #[must_use]
923 pub fn poll_transmit(
924 &mut self,
925 now: Instant,
926 max_datagrams: NonZeroUsize,
927 buf: &mut Vec<u8>,
928 ) -> Option<Transmit> {
929 if let Some(probing) = self
930 .iroh_hp
931 .server_side_mut()
932 .ok()
933 .and_then(iroh_hp::ServerState::next_probe)
934 {
935 let destination = probing.remote();
936 trace!(%destination, "RAND_DATA packet");
937 let token: u64 = self.rng.random();
938 buf.put_u64(token);
939 probing.finish(token);
940 return Some(Transmit {
941 destination,
942 ecn: None,
943 size: 8,
944 segment_size: None,
945 src_ip: None,
946 });
947 }
948
949 let max_datagrams = match self.config.enable_segmentation_offload {
950 false => NonZeroUsize::MIN,
951 true => max_datagrams,
952 };
953
954 let close = match self.state.as_type() {
973 StateType::Drained => {
974 self.app_limited = true;
975 return None;
976 }
977 StateType::Draining | StateType::Closed => {
978 if !self.close {
981 self.app_limited = true;
982 return None;
983 }
984 true
985 }
986 _ => false,
987 };
988
989 if let Some(config) = &self.config.ack_frequency_config {
991 let rtt = self
992 .paths
993 .values()
994 .map(|p| p.data.rtt.get())
995 .min()
996 .expect("one path exists");
997 self.spaces[SpaceId::Data].pending.ack_frequency = self
998 .ack_frequency
999 .should_send_ack_frequency(rtt, config, &self.peer_params)
1000 && self.highest_space == SpaceId::Data
1001 && self.peer_supports_ack_frequency();
1002 }
1003
1004 let mut coalesce = true;
1006
1007 let mut pad_datagram = PadDatagram::No;
1010
1011 let mut congestion_blocked = false;
1015
1016 let mut last_packet_number = None;
1018
1019 let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
1020
1021 let have_available_path = self.paths.iter().any(|(id, path)| {
1024 path.data.validated
1025 && path.data.local_status() == PathStatus::Available
1026 && self.rem_cids.contains_key(id)
1027 });
1028
1029 let mut transmit = TransmitBuf::new(
1031 buf,
1032 max_datagrams,
1033 self.path_data(path_id).current_mtu().into(),
1034 );
1035 if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
1036 return Some(challenge);
1037 }
1038 let mut space_id = match path_id {
1039 PathId::ZERO => SpaceId::Initial,
1040 _ => SpaceId::Data,
1041 };
1042
1043 loop {
1044 let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1046 let err = PathError::RemoteCidsExhausted;
1047 if !self.abandoned_paths.contains(&path_id) {
1048 debug!(?err, %path_id, "no active CID for path");
1049 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1050 id: path_id,
1051 error: err,
1052 }));
1053 self.close_path(
1057 now,
1058 path_id,
1059 TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1060 )
1061 .ok();
1062 self.spaces[SpaceId::Data]
1063 .pending
1064 .path_cids_blocked
1065 .insert(path_id);
1066 } else {
1067 trace!(%path_id, "remote CIDs retired for abandoned path");
1068 }
1069
1070 match self.paths.keys().find(|&&next| next > path_id) {
1071 Some(next_path_id) => {
1072 path_id = *next_path_id;
1074 space_id = SpaceId::Data;
1075
1076 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1078 if let Some(challenge) =
1079 self.send_prev_path_challenge(now, &mut transmit, path_id)
1080 {
1081 return Some(challenge);
1082 }
1083
1084 continue;
1085 }
1086 None => {
1087 trace!(
1089 ?space_id,
1090 %path_id,
1091 "no CIDs to send on path, no more paths"
1092 );
1093 break;
1094 }
1095 }
1096 };
1097
1098 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1101 transmit.datagram_remaining_mut()
1103 } else {
1104 transmit.segment_size()
1106 };
1107 let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1108 let path_should_send = {
1109 let path_exclusive_only = space_id == SpaceId::Data
1110 && have_available_path
1111 && self.path_data(path_id).local_status() == PathStatus::Backup;
1112 let path_should_send = if path_exclusive_only {
1113 can_send.path_exclusive
1114 } else {
1115 !can_send.is_empty()
1116 };
1117 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1118 path_should_send || needs_loss_probe
1119 };
1120
1121 if !path_should_send && space_id < SpaceId::Data {
1122 if self.spaces[space_id].crypto.is_some() {
1123 trace!(?space_id, %path_id, "nothing to send in space");
1124 }
1125 space_id = space_id.next();
1126 continue;
1127 }
1128
1129 let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1130 self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1132 } else {
1133 PathBlocked::No
1134 };
1135 if send_blocked != PathBlocked::No {
1136 trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1137 congestion_blocked = true;
1138 }
1139 if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1140 space_id = space_id.next();
1143 continue;
1144 }
1145 if !path_should_send || send_blocked != PathBlocked::No {
1146 if transmit.num_datagrams() > 0 {
1151 break;
1152 }
1153
1154 match self.paths.keys().find(|&&next| next > path_id) {
1155 Some(next_path_id) => {
1156 trace!(
1158 ?space_id,
1159 %path_id,
1160 %next_path_id,
1161 "nothing to send on path"
1162 );
1163 path_id = *next_path_id;
1164 space_id = SpaceId::Data;
1165
1166 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1168 if let Some(challenge) =
1169 self.send_prev_path_challenge(now, &mut transmit, path_id)
1170 {
1171 return Some(challenge);
1172 }
1173
1174 continue;
1175 }
1176 None => {
1177 trace!(
1179 ?space_id,
1180 %path_id,
1181 next_path_id=?None::<PathId>,
1182 "nothing to send on path"
1183 );
1184 break;
1185 }
1186 }
1187 }
1188
1189 if transmit.datagram_remaining_mut() == 0 {
1191 if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1192 break;
1194 }
1195
1196 match self.spaces[space_id].for_path(path_id).loss_probes {
1197 0 => transmit.start_new_datagram(),
1198 _ => {
1199 let request_immediate_ack =
1201 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1202 self.spaces[space_id].maybe_queue_probe(
1203 path_id,
1204 request_immediate_ack,
1205 &self.streams,
1206 );
1207
1208 self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1209
1210 transmit.start_new_datagram_with_size(std::cmp::min(
1214 usize::from(INITIAL_MTU),
1215 transmit.segment_size(),
1216 ));
1217 }
1218 }
1219 trace!(count = transmit.num_datagrams(), "new datagram started");
1220 coalesce = true;
1221 pad_datagram = PadDatagram::No;
1222 }
1223
1224 if transmit.datagram_start_offset() < transmit.len() {
1227 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1228 }
1229
1230 if self.spaces[SpaceId::Initial].crypto.is_some()
1235 && space_id == SpaceId::Handshake
1236 && self.side.is_client()
1237 {
1238 self.discard_space(now, SpaceId::Initial);
1241 }
1242 if let Some(ref mut prev) = self.prev_crypto {
1243 prev.update_unacked = false;
1244 }
1245
1246 let mut builder = PacketBuilder::new(
1247 now,
1248 space_id,
1249 path_id,
1250 remote_cid,
1251 &mut transmit,
1252 can_send.other,
1253 self,
1254 )?;
1255 last_packet_number = Some(builder.exact_number);
1256 coalesce = coalesce && !builder.short_header;
1257
1258 if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1259 pad_datagram |= PadDatagram::ToMinMtu;
1261 }
1262 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1263 pad_datagram |= PadDatagram::ToSegmentSize;
1264 }
1265
1266 if can_send.close {
1267 trace!("sending CONNECTION_CLOSE");
1268 let is_multipath_negotiated = self.is_multipath_negotiated();
1273 for path_id in self.spaces[space_id]
1274 .number_spaces
1275 .iter()
1276 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1277 .map(|(&path_id, _)| path_id)
1278 .collect::<Vec<_>>()
1279 {
1280 Self::populate_acks(
1281 now,
1282 self.receiving_ecn,
1283 path_id,
1284 space_id,
1285 &mut self.spaces[space_id],
1286 is_multipath_negotiated,
1287 &mut builder,
1288 &mut self.stats.frame_tx,
1289 );
1290 }
1291
1292 debug_assert!(
1296 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1297 "ACKs should leave space for ConnectionClose"
1298 );
1299 let stats = &mut self.stats.frame_tx;
1300 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1301 let max_frame_size = builder.frame_space_remaining();
1302 let close: Close = match self.state.as_type() {
1303 StateType::Closed => {
1304 let reason: Close =
1305 self.state.as_closed().expect("checked").clone().into();
1306 if space_id == SpaceId::Data || reason.is_transport_layer() {
1307 reason
1308 } else {
1309 TransportError::APPLICATION_ERROR("").into()
1310 }
1311 }
1312 StateType::Draining => TransportError::NO_ERROR("").into(),
1313 _ => unreachable!(
1314 "tried to make a close packet when the connection wasn't closed"
1315 ),
1316 };
1317 builder.write_frame(close.encoder(max_frame_size), stats);
1318 }
1319 builder.finish_and_track(now, self, path_id, pad_datagram);
1320 if space_id == self.highest_space {
1321 self.close = false;
1324 break;
1326 } else {
1327 space_id = space_id.next();
1331 continue;
1332 }
1333 }
1334
1335 if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1338 let path = self.path_data_mut(path_id);
1339 if let Some((token, network_path)) =
1340 path.path_responses.pop_off_path(path.network_path)
1341 {
1342 let stats = &mut self.stats.frame_tx;
1347 let frame = frame::PathResponse(token);
1348 builder.write_frame_with_log_msg(frame, stats, Some("(off-path)"));
1349 builder.finish_and_track(now, self, path_id, PadDatagram::ToMinMtu);
1350 self.stats.udp_tx.on_sent(1, transmit.len());
1351 return Some(Transmit {
1352 destination: network_path.remote,
1353 size: transmit.len(),
1354 ecn: None,
1355 segment_size: None,
1356 src_ip: network_path.local_ip,
1357 });
1358 }
1359 }
1360
1361 let path_exclusive_only =
1362 have_available_path && self.path_data(path_id).local_status() == PathStatus::Backup;
1363 self.populate_packet(now, space_id, path_id, path_exclusive_only, &mut builder);
1364
1365 debug_assert!(
1372 !(builder.sent_frames().is_ack_only(&self.streams)
1373 && !can_send.acks
1374 && can_send.other
1375 && builder.buf.segment_size()
1376 == self.path_data(path_id).current_mtu() as usize
1377 && self.datagrams.outgoing.is_empty()),
1378 "SendableFrames was {can_send:?}, but only ACKs have been written"
1379 );
1380 if builder.sent_frames().requires_padding {
1381 pad_datagram |= PadDatagram::ToMinMtu;
1382 }
1383
1384 for (path_id, _pn) in builder.sent_frames().largest_acked.iter() {
1385 self.spaces[space_id]
1386 .for_path(*path_id)
1387 .pending_acks
1388 .acks_sent();
1389 self.timers.stop(
1390 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1391 self.qlog.with_time(now),
1392 );
1393 }
1394
1395 if coalesce
1403 && builder
1404 .buf
1405 .datagram_remaining_mut()
1406 .saturating_sub(builder.predict_packet_end())
1407 > MIN_PACKET_SPACE
1408 && self
1409 .next_send_space(space_id, path_id, builder.buf, close)
1410 .is_some()
1411 {
1412 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1415 } else {
1416 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1419 const MAX_PADDING: usize = 32;
1427 if builder.buf.datagram_remaining_mut()
1428 > builder.predict_packet_end() + MAX_PADDING
1429 {
1430 trace!(
1431 "GSO truncated by demand for {} padding bytes",
1432 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1433 );
1434 builder.finish_and_track(now, self, path_id, PadDatagram::No);
1435 break;
1436 }
1437
1438 builder.finish_and_track(now, self, path_id, PadDatagram::ToSegmentSize);
1441 } else {
1442 builder.finish_and_track(now, self, path_id, pad_datagram);
1443 }
1444 if transmit.num_datagrams() == 1 {
1445 transmit.clip_datagram_size();
1446 }
1447 }
1448 }
1449
1450 if let Some(last_packet_number) = last_packet_number {
1451 self.path_data_mut(path_id).congestion.on_sent(
1454 now,
1455 transmit.len() as u64,
1456 last_packet_number,
1457 );
1458 }
1459
1460 self.qlog.emit_recovery_metrics(
1461 path_id,
1462 &mut self.paths.get_mut(&path_id).unwrap().data,
1463 now,
1464 );
1465
1466 self.app_limited = transmit.is_empty() && !congestion_blocked;
1467
1468 if transmit.is_empty() && self.state.is_established() {
1470 let space_id = SpaceId::Data;
1472 path_id = *self.paths.first_key_value().expect("one path must exist").0;
1473 let probe_data = loop {
1474 let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1480 let eligible = self.path_data(path_id).validated
1481 && !self.path_data(path_id).is_validating_path()
1482 && !self.abandoned_paths.contains(&path_id);
1483 let probe_size = eligible
1484 .then(|| {
1485 let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1486 self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1487 })
1488 .flatten();
1489 match (active_cid, probe_size) {
1490 (Some(active_cid), Some(probe_size)) => {
1491 break Some((active_cid, probe_size));
1493 }
1494 _ => {
1495 match self.paths.keys().find(|&&next| next > path_id) {
1497 Some(next) => {
1498 path_id = *next;
1499 continue;
1500 }
1501 None => break None,
1502 }
1503 }
1504 }
1505 };
1506 if let Some((active_cid, probe_size)) = probe_data {
1507 debug_assert_eq!(transmit.num_datagrams(), 0);
1509 transmit.start_new_datagram_with_size(probe_size as usize);
1510
1511 let mut builder = PacketBuilder::new(
1512 now,
1513 space_id,
1514 path_id,
1515 active_cid,
1516 &mut transmit,
1517 true,
1518 self,
1519 )?;
1520
1521 trace!(?probe_size, "writing MTUD probe");
1523 builder.write_frame(frame::Ping, &mut self.stats.frame_tx);
1524
1525 if self.peer_supports_ack_frequency() {
1527 builder.write_frame(frame::ImmediateAck, &mut self.stats.frame_tx);
1528 }
1529
1530 builder.finish_and_track(now, self, path_id, PadDatagram::ToSize(probe_size));
1531
1532 self.path_stats
1533 .entry(path_id)
1534 .or_default()
1535 .sent_plpmtud_probes += 1;
1536 }
1537 }
1538
1539 if transmit.is_empty() {
1540 return None;
1541 }
1542
1543 let network_path = self.path_data(path_id).network_path;
1544 trace!(
1545 segment_size = transmit.segment_size(),
1546 last_datagram_len = transmit.len() % transmit.segment_size(),
1547 %network_path,
1548 "sending {} bytes in {} datagrams",
1549 transmit.len(),
1550 transmit.num_datagrams()
1551 );
1552 self.path_data_mut(path_id)
1553 .inc_total_sent(transmit.len() as u64);
1554
1555 self.stats
1556 .udp_tx
1557 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1558
1559 Some(Transmit {
1560 destination: network_path.remote,
1561 size: transmit.len(),
1562 ecn: if self.path_data(path_id).sending_ecn {
1563 Some(EcnCodepoint::Ect0)
1564 } else {
1565 None
1566 },
1567 segment_size: match transmit.num_datagrams() {
1568 1 => None,
1569 _ => Some(transmit.segment_size()),
1570 },
1571 src_ip: network_path.local_ip,
1572 })
1573 }
1574
1575 fn next_send_space(
1580 &mut self,
1581 current_space_id: SpaceId,
1582 path_id: PathId,
1583 buf: &TransmitBuf<'_>,
1584 close: bool,
1585 ) -> Option<SpaceId> {
1586 let mut space_id = current_space_id;
1593 loop {
1594 let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1595 if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1596 return Some(space_id);
1597 }
1598 space_id = match space_id {
1599 SpaceId::Initial => SpaceId::Handshake,
1600 SpaceId::Handshake => SpaceId::Data,
1601 SpaceId::Data => break,
1602 }
1603 }
1604 None
1605 }
1606
1607 fn path_congestion_check(
1609 &mut self,
1610 space_id: SpaceId,
1611 path_id: PathId,
1612 transmit: &TransmitBuf<'_>,
1613 can_send: &SendableFrames,
1614 now: Instant,
1615 ) -> PathBlocked {
1616 if self.side().is_server()
1622 && self
1623 .path_data(path_id)
1624 .anti_amplification_blocked(transmit.len() as u64 + 1)
1625 {
1626 trace!(?space_id, %path_id, "blocked by anti-amplification");
1627 return PathBlocked::AntiAmplification;
1628 }
1629
1630 let bytes_to_send = transmit.segment_size() as u64;
1633 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1634
1635 if can_send.other && !need_loss_probe && !can_send.close {
1636 let path = self.path_data(path_id);
1637 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1638 trace!(?space_id, %path_id, "blocked by congestion control");
1639 return PathBlocked::Congestion;
1640 }
1641 }
1642
1643 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1645 self.timers.set(
1646 Timer::PerPath(path_id, PathTimer::Pacing),
1647 delay,
1648 self.qlog.with_time(now),
1649 );
1650 trace!(?space_id, %path_id, "blocked by pacing");
1653 return PathBlocked::Pacing;
1654 }
1655
1656 PathBlocked::No
1657 }
1658
1659 fn send_prev_path_challenge(
1664 &mut self,
1665 now: Instant,
1666 buf: &mut TransmitBuf<'_>,
1667 path_id: PathId,
1668 ) -> Option<Transmit> {
1669 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1670 if !prev_path.send_new_challenge {
1673 return None;
1674 };
1675 prev_path.send_new_challenge = false;
1676 let network_path = prev_path.network_path;
1677 let token = self.rng.random();
1678 let info = paths::SentChallengeInfo {
1679 sent_instant: now,
1680 network_path,
1681 };
1682 prev_path.challenges_sent.insert(token, info);
1683 debug_assert_eq!(
1684 self.highest_space,
1685 SpaceId::Data,
1686 "PATH_CHALLENGE queued without 1-RTT keys"
1687 );
1688 buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1689
1690 debug_assert_eq!(buf.datagram_start_offset(), 0);
1696 let mut builder =
1697 PacketBuilder::new(now, SpaceId::Data, path_id, *prev_cid, buf, false, self)?;
1698 let challenge = frame::PathChallenge(token);
1699 let stats = &mut self.stats.frame_tx;
1700 builder.write_frame_with_log_msg(challenge, stats, Some("validating previous path"));
1701
1702 builder.pad_to(MIN_INITIAL_SIZE);
1707
1708 builder.finish(self, now);
1709 self.stats.udp_tx.on_sent(1, buf.len());
1710
1711 Some(Transmit {
1712 destination: network_path.remote,
1713 size: buf.len(),
1714 ecn: None,
1715 segment_size: None,
1716 src_ip: network_path.local_ip,
1717 })
1718 }
1719
1720 fn space_can_send(
1725 &mut self,
1726 space_id: SpaceId,
1727 path_id: PathId,
1728 packet_size: usize,
1729 close: bool,
1730 ) -> SendableFrames {
1731 let pn = self.spaces[SpaceId::Data]
1732 .for_path(path_id)
1733 .peek_tx_number();
1734 let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1735 if self.spaces[space_id].crypto.is_none()
1736 && (space_id != SpaceId::Data
1737 || self.zero_rtt_crypto.is_none()
1738 || self.side.is_server())
1739 {
1740 return SendableFrames::empty();
1742 }
1743 let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1744 if space_id == SpaceId::Data {
1745 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1746 }
1747
1748 can_send.close = close && self.spaces[space_id].crypto.is_some();
1749
1750 can_send
1751 }
1752
1753 pub fn handle_event(&mut self, event: ConnectionEvent) {
1759 use ConnectionEventInner::*;
1760 match event.0 {
1761 Datagram(DatagramConnectionEvent {
1762 now,
1763 network_path,
1764 path_id,
1765 ecn,
1766 first_decode,
1767 remaining,
1768 }) => {
1769 let span = trace_span!("pkt", %path_id);
1770 let _guard = span.enter();
1771
1772 if self.update_network_path_or_discard(network_path, path_id) {
1773 return;
1775 }
1776
1777 let was_anti_amplification_blocked = self
1778 .path(path_id)
1779 .map(|path| path.anti_amplification_blocked(1))
1780 .unwrap_or(true); self.stats.udp_rx.datagrams += 1;
1784 self.stats.udp_rx.bytes += first_decode.len() as u64;
1785 let data_len = first_decode.len();
1786
1787 self.handle_decode(now, network_path, path_id, ecn, first_decode);
1788 if let Some(path) = self.path_mut(path_id) {
1793 path.inc_total_recvd(data_len as u64);
1794 }
1795
1796 if let Some(data) = remaining {
1797 self.stats.udp_rx.bytes += data.len() as u64;
1798 self.handle_coalesced(now, network_path, path_id, ecn, data);
1799 }
1800
1801 if let Some(path) = self.paths.get_mut(&path_id) {
1802 self.qlog
1803 .emit_recovery_metrics(path_id, &mut path.data, now);
1804 }
1805
1806 if was_anti_amplification_blocked {
1807 self.set_loss_detection_timer(now, path_id);
1811 }
1812 }
1813 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1814 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1815 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1816 let cid_state = self
1817 .local_cid_state
1818 .entry(path_id)
1819 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1820 cid_state.new_cids(&ids, now);
1821
1822 ids.into_iter().rev().for_each(|frame| {
1823 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1824 });
1825 self.reset_cid_retirement(now);
1827 }
1828 }
1829 }
1830
1831 fn update_network_path_or_discard(&mut self, network_path: FourTuple, path_id: PathId) -> bool {
1836 let remote_may_migrate = self.side.remote_may_migrate(&self.state);
1837 let local_ip_may_migrate = self.side.is_client();
1838 if let Some(known_path) = self.path_mut(path_id) {
1842 if network_path.remote != known_path.network_path.remote && !remote_may_migrate {
1843 trace!(
1844 %path_id,
1845 %network_path,
1846 %known_path.network_path,
1847 "discarding packet from unrecognized peer"
1848 );
1849 return true;
1850 }
1851
1852 if known_path.network_path.local_ip.is_some()
1853 && network_path.local_ip.is_some()
1854 && known_path.network_path.local_ip != network_path.local_ip
1855 && !local_ip_may_migrate
1856 {
1857 trace!(
1858 %path_id,
1859 %network_path,
1860 %known_path.network_path,
1861 "discarding packet sent to incorrect interface"
1862 );
1863 return true;
1864 }
1865 if let Some(local_ip) = network_path.local_ip {
1870 if known_path
1871 .network_path
1872 .local_ip
1873 .is_some_and(|ip| ip != local_ip)
1874 {
1875 debug!(
1876 %path_id,
1877 %network_path,
1878 %known_path.network_path,
1879 "path's local address seemingly migrated"
1880 );
1881 }
1882 known_path.network_path.local_ip = Some(local_ip);
1889 }
1890 }
1891 false
1892 }
1893
1894 pub fn handle_timeout(&mut self, now: Instant) {
1904 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1905 trace!(?timer, at=?now, "timeout");
1907 match timer {
1908 Timer::Conn(timer) => match timer {
1909 ConnTimer::Close => {
1910 self.state.move_to_drained(None);
1911 self.endpoint_events.push_back(EndpointEventInner::Drained);
1912 }
1913 ConnTimer::Idle => {
1914 self.kill(ConnectionError::TimedOut);
1915 }
1916 ConnTimer::KeepAlive => {
1917 trace!("sending keep-alive");
1918 self.ping();
1919 }
1920 ConnTimer::KeyDiscard => {
1921 self.zero_rtt_crypto = None;
1922 self.prev_crypto = None;
1923 }
1924 ConnTimer::PushNewCid => {
1925 while let Some((path_id, when)) = self.next_cid_retirement() {
1926 if when > now {
1927 break;
1928 }
1929 match self.local_cid_state.get_mut(&path_id) {
1930 None => error!(%path_id, "No local CID state for path"),
1931 Some(cid_state) => {
1932 let num_new_cid = cid_state.on_cid_timeout().into();
1934 if !self.state.is_closed() {
1935 trace!(
1936 "push a new CID to peer RETIRE_PRIOR_TO field {}",
1937 cid_state.retire_prior_to()
1938 );
1939 self.endpoint_events.push_back(
1940 EndpointEventInner::NeedIdentifiers(
1941 path_id,
1942 now,
1943 num_new_cid,
1944 ),
1945 );
1946 }
1947 }
1948 }
1949 }
1950 }
1951 },
1952 Timer::PerPath(path_id, timer) => {
1954 let span = trace_span!("per-path timer fired", %path_id, ?timer);
1955 let _guard = span.enter();
1956 match timer {
1957 PathTimer::PathIdle => {
1958 self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1959 .ok();
1960 }
1961
1962 PathTimer::PathKeepAlive => {
1963 trace!("sending keep-alive on path");
1964 self.ping_path(path_id).ok();
1965 }
1966 PathTimer::LossDetection => {
1967 self.on_loss_detection_timeout(now, path_id);
1968 self.qlog.emit_recovery_metrics(
1969 path_id,
1970 &mut self.paths.get_mut(&path_id).unwrap().data,
1971 now,
1972 );
1973 }
1974 PathTimer::PathValidation => {
1975 let Some(path) = self.paths.get_mut(&path_id) else {
1976 continue;
1977 };
1978 self.timers.stop(
1979 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1980 self.qlog.with_time(now),
1981 );
1982 debug!("path validation failed");
1983 if let Some((_, prev)) = path.prev.take() {
1984 path.data = prev;
1985 }
1986 path.data.challenges_sent.clear();
1987 path.data.send_new_challenge = false;
1988 }
1989 PathTimer::PathChallengeLost => {
1990 let Some(path) = self.paths.get_mut(&path_id) else {
1991 continue;
1992 };
1993 trace!("path challenge deemed lost");
1994 path.data.send_new_challenge = true;
1995 }
1996 PathTimer::PathOpen => {
1997 let Some(path) = self.paths.get_mut(&path_id) else {
1998 continue;
1999 };
2000 path.data.challenges_sent.clear();
2001 path.data.send_new_challenge = false;
2002 self.timers.stop(
2003 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
2004 self.qlog.with_time(now),
2005 );
2006 debug!("new path validation failed");
2007 if let Err(err) = self.close_path(
2008 now,
2009 path_id,
2010 TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2011 ) {
2012 warn!(?err, "failed closing path");
2013 }
2014
2015 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2016 id: path_id,
2017 error: PathError::ValidationFailed,
2018 }));
2019 }
2020 PathTimer::Pacing => trace!("pacing timer expired"),
2021 PathTimer::MaxAckDelay => {
2022 trace!("max ack delay reached");
2023 self.spaces[SpaceId::Data]
2025 .for_path(path_id)
2026 .pending_acks
2027 .on_max_ack_delay_timeout()
2028 }
2029 PathTimer::DiscardPath => {
2030 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2033 if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2034 let (min_seq, max_seq) = loc_cid_state.active_seq();
2035 for seq in min_seq..=max_seq {
2036 self.endpoint_events.push_back(
2037 EndpointEventInner::RetireConnectionId(
2038 now, path_id, seq, false,
2039 ),
2040 );
2041 }
2042 }
2043 self.discard_path(path_id, now);
2044 }
2045 }
2046 }
2047 }
2048 }
2049 }
2050
2051 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2063 self.close_inner(
2064 now,
2065 Close::Application(frame::ApplicationClose { error_code, reason }),
2066 )
2067 }
2068
2069 fn close_inner(&mut self, now: Instant, reason: Close) {
2070 let was_closed = self.state.is_closed();
2071 if !was_closed {
2072 self.close_common();
2073 self.set_close_timer(now);
2074 self.close = true;
2075 self.state.move_to_closed_local(reason);
2076 }
2077 }
2078
2079 pub fn datagrams(&mut self) -> Datagrams<'_> {
2081 Datagrams { conn: self }
2082 }
2083
2084 pub fn stats(&mut self) -> ConnectionStats {
2086 self.stats.clone()
2087 }
2088
2089 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2091 let path = self.paths.get(&path_id)?;
2092 let stats = self.path_stats.entry(path_id).or_default();
2093 stats.rtt = path.data.rtt.get();
2094 stats.cwnd = path.data.congestion.window();
2095 stats.current_mtu = path.data.mtud.current_mtu();
2096 Some(*stats)
2097 }
2098
2099 pub fn ping(&mut self) {
2103 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2106 path_data.ping_pending = true;
2107 }
2108 }
2109
2110 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2114 let path_data = self.spaces[self.highest_space]
2115 .number_spaces
2116 .get_mut(&path)
2117 .ok_or(ClosedPath { _private: () })?;
2118 path_data.ping_pending = true;
2119 Ok(())
2120 }
2121
2122 pub fn force_key_update(&mut self) {
2126 if !self.state.is_established() {
2127 debug!("ignoring forced key update in illegal state");
2128 return;
2129 }
2130 if self.prev_crypto.is_some() {
2131 debug!("ignoring redundant forced key update");
2134 return;
2135 }
2136 self.update_keys(None, false);
2137 }
2138
2139 pub fn crypto_session(&self) -> &dyn crypto::Session {
2141 &*self.crypto
2142 }
2143
2144 pub fn is_handshaking(&self) -> bool {
2149 self.state.is_handshake()
2150 }
2151
2152 pub fn is_closed(&self) -> bool {
2160 self.state.is_closed()
2161 }
2162
2163 pub fn is_drained(&self) -> bool {
2168 self.state.is_drained()
2169 }
2170
2171 pub fn accepted_0rtt(&self) -> bool {
2175 self.accepted_0rtt
2176 }
2177
2178 pub fn has_0rtt(&self) -> bool {
2180 self.zero_rtt_enabled
2181 }
2182
2183 pub fn has_pending_retransmits(&self) -> bool {
2185 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2186 }
2187
2188 pub fn side(&self) -> Side {
2190 self.side.side()
2191 }
2192
2193 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2195 self.path(path_id)
2196 .map(|path_data| {
2197 path_data
2198 .last_observed_addr_report
2199 .as_ref()
2200 .map(|observed| observed.socket_addr())
2201 })
2202 .ok_or(ClosedPath { _private: () })
2203 }
2204
2205 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2207 self.path(path_id).map(|d| d.rtt.get())
2208 }
2209
2210 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2212 self.path(path_id).map(|d| d.congestion.as_ref())
2213 }
2214
2215 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2220 self.streams.set_max_concurrent(dir, count);
2221 let pending = &mut self.spaces[SpaceId::Data].pending;
2224 self.streams.queue_max_stream_id(pending);
2225 }
2226
2227 pub fn set_max_concurrent_paths(
2237 &mut self,
2238 now: Instant,
2239 count: NonZeroU32,
2240 ) -> Result<(), MultipathNotNegotiated> {
2241 if !self.is_multipath_negotiated() {
2242 return Err(MultipathNotNegotiated { _private: () });
2243 }
2244 self.max_concurrent_paths = count;
2245
2246 let in_use_count = self
2247 .local_max_path_id
2248 .next()
2249 .saturating_sub(self.abandoned_paths.len() as u32)
2250 .as_u32();
2251 let extra_needed = count.get().saturating_sub(in_use_count);
2252 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2253
2254 self.set_max_path_id(now, new_max_path_id);
2255
2256 Ok(())
2257 }
2258
2259 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2261 if max_path_id <= self.local_max_path_id {
2262 return;
2263 }
2264
2265 self.local_max_path_id = max_path_id;
2266 self.spaces[SpaceId::Data].pending.max_path_id = true;
2267
2268 self.issue_first_path_cids(now);
2269 }
2270
2271 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2277 self.streams.max_concurrent(dir)
2278 }
2279
2280 pub fn set_send_window(&mut self, send_window: u64) {
2282 self.streams.set_send_window(send_window);
2283 }
2284
2285 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2287 if self.streams.set_receive_window(receive_window) {
2288 self.spaces[SpaceId::Data].pending.max_data = true;
2289 }
2290 }
2291
2292 pub fn is_multipath_negotiated(&self) -> bool {
2297 !self.is_handshaking()
2298 && self.config.max_concurrent_multipath_paths.is_some()
2299 && self.peer_params.initial_max_path_id.is_some()
2300 }
2301
2302 fn on_ack_received(
2303 &mut self,
2304 now: Instant,
2305 space: SpaceId,
2306 ack: frame::Ack,
2307 ) -> Result<(), TransportError> {
2308 let path = PathId::ZERO;
2310 self.inner_on_ack_received(now, space, path, ack)
2311 }
2312
2313 fn on_path_ack_received(
2314 &mut self,
2315 now: Instant,
2316 space: SpaceId,
2317 path_ack: frame::PathAck,
2318 ) -> Result<(), TransportError> {
2319 let (ack, path) = path_ack.into_ack();
2320 self.inner_on_ack_received(now, space, path, ack)
2321 }
2322
2323 fn inner_on_ack_received(
2325 &mut self,
2326 now: Instant,
2327 space: SpaceId,
2328 path: PathId,
2329 ack: frame::Ack,
2330 ) -> Result<(), TransportError> {
2331 if self.abandoned_paths.contains(&path) {
2332 trace!("silently ignoring PATH_ACK on abandoned path");
2335 return Ok(());
2336 }
2337 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2338 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2339 }
2340 let new_largest = {
2341 let space = &mut self.spaces[space].for_path(path);
2342 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2343 space.largest_acked_packet = Some(ack.largest);
2344 if let Some(info) = space.sent_packets.get(ack.largest) {
2345 space.largest_acked_packet_sent = info.time_sent;
2349 }
2350 true
2351 } else {
2352 false
2353 }
2354 };
2355
2356 if self.detect_spurious_loss(&ack, space, path) {
2357 self.path_data_mut(path)
2358 .congestion
2359 .on_spurious_congestion_event();
2360 }
2361
2362 let mut newly_acked = ArrayRangeSet::new();
2364 for range in ack.iter() {
2365 self.spaces[space].for_path(path).check_ack(range.clone())?;
2366 for (pn, _) in self.spaces[space]
2367 .for_path(path)
2368 .sent_packets
2369 .iter_range(range)
2370 {
2371 newly_acked.insert_one(pn);
2372 }
2373 }
2374
2375 if newly_acked.is_empty() {
2376 return Ok(());
2377 }
2378
2379 let mut ack_eliciting_acked = false;
2380 for packet in newly_acked.elts() {
2381 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2382 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2383 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2389 pns.pending_acks.subtract_below(*acked_pn);
2390 }
2391 }
2392 ack_eliciting_acked |= info.ack_eliciting;
2393
2394 let path_data = self.path_data_mut(path);
2396 let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2397 if mtu_updated {
2398 path_data
2399 .congestion
2400 .on_mtu_update(path_data.mtud.current_mtu());
2401 }
2402
2403 self.ack_frequency.on_acked(path, packet);
2405
2406 self.on_packet_acked(now, path, info);
2407 }
2408 }
2409
2410 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2411 let app_limited = self.app_limited;
2412 let path_data = self.path_data_mut(path);
2413 let in_flight = path_data.in_flight.bytes;
2414
2415 path_data
2416 .congestion
2417 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2418
2419 if new_largest && ack_eliciting_acked {
2420 let ack_delay = if space != SpaceId::Data {
2421 Duration::from_micros(0)
2422 } else {
2423 cmp::min(
2424 self.ack_frequency.peer_max_ack_delay,
2425 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2426 )
2427 };
2428 let rtt = now.saturating_duration_since(
2429 self.spaces[space].for_path(path).largest_acked_packet_sent,
2430 );
2431
2432 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2433 let path_data = self.path_data_mut(path);
2434 path_data.rtt.update(ack_delay, rtt);
2436 if path_data.first_packet_after_rtt_sample.is_none() {
2437 path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2438 }
2439 }
2440
2441 self.detect_lost_packets(now, space, path, true);
2443
2444 if self.peer_completed_address_validation(path) {
2445 self.path_data_mut(path).pto_count = 0;
2446 }
2447
2448 if self.path_data(path).sending_ecn {
2453 if let Some(ecn) = ack.ecn {
2454 if new_largest {
2459 let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2460 self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2461 }
2462 } else {
2463 debug!("ECN not acknowledged by peer");
2465 self.path_data_mut(path).sending_ecn = false;
2466 }
2467 }
2468
2469 self.set_loss_detection_timer(now, path);
2470 Ok(())
2471 }
2472
2473 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2474 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2475
2476 if lost_packets.is_empty() {
2477 return false;
2478 }
2479
2480 for range in ack.iter() {
2481 let spurious_losses: Vec<u64> = lost_packets
2482 .iter_range(range.clone())
2483 .map(|(pn, _info)| pn)
2484 .collect();
2485
2486 for pn in spurious_losses {
2487 lost_packets.remove(pn);
2488 }
2489 }
2490
2491 lost_packets.is_empty()
2496 }
2497
2498 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2503 let two_pto = 2 * self.path_data(path).rtt.pto_base();
2504
2505 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2506 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2507 }
2508
2509 fn process_ecn(
2511 &mut self,
2512 now: Instant,
2513 space: SpaceId,
2514 path: PathId,
2515 newly_acked: u64,
2516 ecn: frame::EcnCounts,
2517 largest_sent_time: Instant,
2518 ) {
2519 match self.spaces[space]
2520 .for_path(path)
2521 .detect_ecn(newly_acked, ecn)
2522 {
2523 Err(e) => {
2524 debug!("halting ECN due to verification failure: {}", e);
2525
2526 self.path_data_mut(path).sending_ecn = false;
2527 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2530 }
2531 Ok(false) => {}
2532 Ok(true) => {
2533 self.path_stats.entry(path).or_default().congestion_events += 1;
2534 self.path_data_mut(path).congestion.on_congestion_event(
2535 now,
2536 largest_sent_time,
2537 false,
2538 true,
2539 0,
2540 );
2541 }
2542 }
2543 }
2544
2545 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2548 self.paths
2549 .get_mut(&path_id)
2550 .expect("known path")
2551 .remove_in_flight(&info);
2552 let app_limited = self.app_limited;
2553 let path = self.path_data_mut(path_id);
2554 if info.ack_eliciting && !path.is_validating_path() {
2555 let rtt = path.rtt;
2558 path.congestion
2559 .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2560 }
2561
2562 if let Some(retransmits) = info.retransmits.get() {
2564 for (id, _) in retransmits.reset_stream.iter() {
2565 self.streams.reset_acked(*id);
2566 }
2567 }
2568
2569 for frame in info.stream_frames {
2570 self.streams.received_ack_of(frame);
2571 }
2572 }
2573
2574 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2575 let start = if self.zero_rtt_crypto.is_some() {
2576 now
2577 } else {
2578 self.prev_crypto
2579 .as_ref()
2580 .expect("no previous keys")
2581 .end_packet
2582 .as_ref()
2583 .expect("update not acknowledged yet")
2584 .1
2585 };
2586
2587 self.timers.set(
2589 Timer::Conn(ConnTimer::KeyDiscard),
2590 start + self.pto_max_path(space, false) * 3,
2591 self.qlog.with_time(now),
2592 );
2593 }
2594
2595 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2608 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2609 self.detect_lost_packets(now, pn_space, path_id, false);
2611 self.set_loss_detection_timer(now, path_id);
2612 return;
2613 }
2614
2615 let (_, space) = match self.pto_time_and_space(now, path_id) {
2616 Some(x) => x,
2617 None => {
2618 error!(%path_id, "PTO expired while unset");
2619 return;
2620 }
2621 };
2622 trace!(
2623 in_flight = self.path_data(path_id).in_flight.bytes,
2624 count = self.path_data(path_id).pto_count,
2625 ?space,
2626 %path_id,
2627 "PTO fired"
2628 );
2629
2630 let count = match self.path_data(path_id).in_flight.ack_eliciting {
2631 0 => {
2634 debug_assert!(!self.peer_completed_address_validation(path_id));
2635 1
2636 }
2637 _ => 2,
2639 };
2640 let pns = self.spaces[space].for_path(path_id);
2641 pns.loss_probes = pns.loss_probes.saturating_add(count);
2642 let path_data = self.path_data_mut(path_id);
2643 path_data.pto_count = path_data.pto_count.saturating_add(1);
2644 self.set_loss_detection_timer(now, path_id);
2645 }
2646
2647 fn detect_lost_packets(
2664 &mut self,
2665 now: Instant,
2666 pn_space: SpaceId,
2667 path_id: PathId,
2668 due_to_ack: bool,
2669 ) {
2670 let mut lost_packets = Vec::<u64>::new();
2671 let mut lost_mtu_probe = None;
2672 let mut in_persistent_congestion = false;
2673 let mut size_of_lost_packets = 0u64;
2674 self.spaces[pn_space].for_path(path_id).loss_time = None;
2675
2676 let path = self.path_data(path_id);
2679 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2680 let loss_delay = path
2681 .rtt
2682 .conservative()
2683 .mul_f32(self.config.time_threshold)
2684 .max(TIMER_GRANULARITY);
2685 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2686
2687 let largest_acked_packet = self.spaces[pn_space]
2688 .for_path(path_id)
2689 .largest_acked_packet
2690 .expect("detect_lost_packets only to be called if path received at least one ACK");
2691 let packet_threshold = self.config.packet_threshold as u64;
2692
2693 let congestion_period = self
2697 .pto(SpaceId::Data, path_id)
2698 .saturating_mul(self.config.persistent_congestion_threshold);
2699 let mut persistent_congestion_start: Option<Instant> = None;
2700 let mut prev_packet = None;
2701 let space = self.spaces[pn_space].for_path(path_id);
2702
2703 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2704 if prev_packet != Some(packet.wrapping_sub(1)) {
2705 persistent_congestion_start = None;
2707 }
2708
2709 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2713 if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2714 if Some(packet) == in_flight_mtu_probe {
2716 lost_mtu_probe = in_flight_mtu_probe;
2719 } else {
2720 lost_packets.push(packet);
2721 size_of_lost_packets += info.size as u64;
2722 if info.ack_eliciting && due_to_ack {
2723 match persistent_congestion_start {
2724 Some(start) if info.time_sent - start > congestion_period => {
2727 in_persistent_congestion = true;
2728 }
2729 None if first_packet_after_rtt_sample
2731 .is_some_and(|x| x < (pn_space, packet)) =>
2732 {
2733 persistent_congestion_start = Some(info.time_sent);
2734 }
2735 _ => {}
2736 }
2737 }
2738 }
2739 } else {
2740 if space.loss_time.is_none() {
2742 space.loss_time = Some(info.time_sent + loss_delay);
2745 }
2746 persistent_congestion_start = None;
2747 }
2748
2749 prev_packet = Some(packet);
2750 }
2751
2752 self.handle_lost_packets(
2753 pn_space,
2754 path_id,
2755 now,
2756 lost_packets,
2757 lost_mtu_probe,
2758 loss_delay,
2759 in_persistent_congestion,
2760 size_of_lost_packets,
2761 );
2762 }
2763
2764 fn discard_path(&mut self, path_id: PathId, now: Instant) {
2766 trace!(%path_id, "dropping path state");
2767 let path = self.path_data(path_id);
2768 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2769
2770 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2772 .for_path(path_id)
2773 .sent_packets
2774 .iter()
2775 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2776 .map(|(pn, info)| {
2777 size_of_lost_packets += info.size as u64;
2778 pn
2779 })
2780 .collect();
2781
2782 if !lost_pns.is_empty() {
2783 trace!(
2784 %path_id,
2785 count = lost_pns.len(),
2786 lost_bytes = size_of_lost_packets,
2787 "packets lost on path abandon"
2788 );
2789 self.handle_lost_packets(
2790 SpaceId::Data,
2791 path_id,
2792 now,
2793 lost_pns,
2794 in_flight_mtu_probe,
2795 Duration::ZERO,
2796 false,
2797 size_of_lost_packets,
2798 );
2799 }
2800 self.paths.remove(&path_id);
2801 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2802
2803 let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2804 self.events.push_back(
2805 PathEvent::Abandoned {
2806 id: path_id,
2807 path_stats,
2808 }
2809 .into(),
2810 );
2811 }
2812
2813 fn handle_lost_packets(
2814 &mut self,
2815 pn_space: SpaceId,
2816 path_id: PathId,
2817 now: Instant,
2818 lost_packets: Vec<u64>,
2819 lost_mtu_probe: Option<u64>,
2820 loss_delay: Duration,
2821 in_persistent_congestion: bool,
2822 size_of_lost_packets: u64,
2823 ) {
2824 debug_assert!(
2825 {
2826 let mut sorted = lost_packets.clone();
2827 sorted.sort();
2828 sorted == lost_packets
2829 },
2830 "lost_packets must be sorted"
2831 );
2832
2833 self.drain_lost_packets(now, pn_space, path_id);
2834
2835 if let Some(largest_lost) = lost_packets.last().cloned() {
2837 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2838 let largest_lost_sent = self.spaces[pn_space]
2839 .for_path(path_id)
2840 .sent_packets
2841 .get(largest_lost)
2842 .unwrap()
2843 .time_sent;
2844 let path_stats = self.path_stats.entry(path_id).or_default();
2845 path_stats.lost_packets += lost_packets.len() as u64;
2846 path_stats.lost_bytes += size_of_lost_packets;
2847 trace!(
2848 %path_id,
2849 count = lost_packets.len(),
2850 lost_bytes = size_of_lost_packets,
2851 "packets lost",
2852 );
2853
2854 for &packet in &lost_packets {
2855 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2856 continue;
2857 };
2858 self.qlog
2859 .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2860 self.paths
2861 .get_mut(&path_id)
2862 .unwrap()
2863 .remove_in_flight(&info);
2864
2865 for frame in info.stream_frames {
2866 self.streams.retransmit(frame);
2867 }
2868 self.spaces[pn_space].pending |= info.retransmits;
2869 self.path_data_mut(path_id)
2870 .mtud
2871 .on_non_probe_lost(packet, info.size);
2872
2873 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2874 packet,
2875 LostPacket {
2876 time_sent: info.time_sent,
2877 },
2878 );
2879 }
2880
2881 let path = self.path_data_mut(path_id);
2882 if path.mtud.black_hole_detected(now) {
2883 path.congestion.on_mtu_update(path.mtud.current_mtu());
2884 if let Some(max_datagram_size) = self.datagrams().max_size() {
2885 if self.datagrams.drop_oversized(max_datagram_size)
2886 && self.datagrams.send_blocked
2887 {
2888 self.datagrams.send_blocked = false;
2889 self.events.push_back(Event::DatagramsUnblocked);
2890 }
2891 }
2892 self.path_stats
2893 .entry(path_id)
2894 .or_default()
2895 .black_holes_detected += 1;
2896 }
2897
2898 let lost_ack_eliciting =
2900 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2901
2902 if lost_ack_eliciting {
2903 self.path_stats
2904 .entry(path_id)
2905 .or_default()
2906 .congestion_events += 1;
2907 self.path_data_mut(path_id).congestion.on_congestion_event(
2908 now,
2909 largest_lost_sent,
2910 in_persistent_congestion,
2911 false,
2912 size_of_lost_packets,
2913 );
2914 }
2915 }
2916
2917 if let Some(packet) = lost_mtu_probe {
2919 let info = self.spaces[SpaceId::Data]
2920 .for_path(path_id)
2921 .take(packet)
2922 .unwrap(); self.paths
2925 .get_mut(&path_id)
2926 .unwrap()
2927 .remove_in_flight(&info);
2928 self.path_data_mut(path_id).mtud.on_probe_lost();
2929 self.path_stats
2930 .entry(path_id)
2931 .or_default()
2932 .lost_plpmtud_probes += 1;
2933 }
2934 }
2935
2936 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2942 SpaceId::iter()
2943 .filter_map(|id| {
2944 self.spaces[id]
2945 .number_spaces
2946 .get(&path_id)
2947 .and_then(|pns| pns.loss_time)
2948 .map(|time| (time, id))
2949 })
2950 .min_by_key(|&(time, _)| time)
2951 }
2952
2953 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2955 let path = self.path(path_id)?;
2956 let pto_count = path.pto_count;
2957 let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2958 let mut duration = path.rtt.pto_base() * backoff;
2959
2960 if path_id == PathId::ZERO
2961 && path.in_flight.ack_eliciting == 0
2962 && !self.peer_completed_address_validation(PathId::ZERO)
2963 {
2964 let space = match self.highest_space {
2970 SpaceId::Handshake => SpaceId::Handshake,
2971 _ => SpaceId::Initial,
2972 };
2973
2974 return Some((now + duration, space));
2975 }
2976
2977 let mut result = None;
2978 for space in SpaceId::iter() {
2979 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2980 continue;
2981 };
2982
2983 if !pns.has_in_flight() {
2984 continue;
2985 }
2986 if space == SpaceId::Data {
2987 if self.is_handshaking() {
2989 return result;
2990 }
2991 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2993 }
2994 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
2995 continue;
2996 };
2997 let pto = last_ack_eliciting + duration;
2998 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2999 if path.anti_amplification_blocked(1) {
3000 continue;
3002 }
3003 if path.in_flight.ack_eliciting == 0 {
3004 continue;
3006 }
3007 result = Some((pto, space));
3008 }
3009 }
3010 result
3011 }
3012
3013 fn peer_completed_address_validation(&self, path: PathId) -> bool {
3014 if self.side.is_server() || self.state.is_closed() {
3016 return true;
3017 }
3018 self.spaces[SpaceId::Handshake]
3021 .path_space(PathId::ZERO)
3022 .and_then(|pns| pns.largest_acked_packet)
3023 .is_some()
3024 || self.spaces[SpaceId::Data]
3025 .path_space(path)
3026 .and_then(|pns| pns.largest_acked_packet)
3027 .is_some()
3028 || (self.spaces[SpaceId::Data].crypto.is_some()
3029 && self.spaces[SpaceId::Handshake].crypto.is_none())
3030 }
3031
3032 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3040 if self.state.is_closed() {
3041 return;
3045 }
3046
3047 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3048 self.timers.set(
3050 Timer::PerPath(path_id, PathTimer::LossDetection),
3051 loss_time,
3052 self.qlog.with_time(now),
3053 );
3054 return;
3055 }
3056
3057 if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3060 self.timers.set(
3061 Timer::PerPath(path_id, PathTimer::LossDetection),
3062 timeout,
3063 self.qlog.with_time(now),
3064 );
3065 } else {
3066 self.timers.stop(
3067 Timer::PerPath(path_id, PathTimer::LossDetection),
3068 self.qlog.with_time(now),
3069 );
3070 }
3071 }
3072
3073 fn pto_max_path(&self, space: SpaceId, is_closing: bool) -> Duration {
3079 match space {
3080 SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3081 SpaceId::Data => self
3082 .paths
3083 .iter()
3084 .filter_map(|(path_id, state)| {
3085 if is_closing && state.data.total_sent == 0 && state.data.total_recvd == 0 {
3086 None
3088 } else {
3089 let pto = self.pto(space, *path_id);
3090 Some(pto)
3091 }
3092 })
3093 .max()
3094 .expect("there should be one at least path"),
3095 }
3096 }
3097
3098 fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3103 let max_ack_delay = match space {
3104 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3105 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3106 };
3107 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3108 }
3109
3110 fn on_packet_authenticated(
3111 &mut self,
3112 now: Instant,
3113 space_id: SpaceId,
3114 path_id: PathId,
3115 ecn: Option<EcnCodepoint>,
3116 packet: Option<u64>,
3117 spin: bool,
3118 is_1rtt: bool,
3119 ) {
3120 self.total_authed_packets += 1;
3121 if let Some(last_allowed_receive) = self
3122 .paths
3123 .get(&path_id)
3124 .and_then(|path| path.data.last_allowed_receive)
3125 {
3126 if now > last_allowed_receive {
3127 warn!("received data on path which we abandoned more than 3 * PTO ago");
3128 if !self.state.is_closed() {
3130 self.state.move_to_closed(TransportError::NO_ERROR(
3132 "peer failed to respond with PATH_ABANDON in time",
3133 ));
3134 self.close_common();
3135 self.set_close_timer(now);
3136 self.close = true;
3137 }
3138 return;
3139 }
3140 }
3141
3142 self.reset_keep_alive(path_id, now);
3143 self.reset_idle_timeout(now, space_id, path_id);
3144 self.permit_idle_reset = true;
3145 self.receiving_ecn |= ecn.is_some();
3146 if let Some(x) = ecn {
3147 let space = &mut self.spaces[space_id];
3148 space.for_path(path_id).ecn_counters += x;
3149
3150 if x.is_ce() {
3151 space
3152 .for_path(path_id)
3153 .pending_acks
3154 .set_immediate_ack_required();
3155 }
3156 }
3157
3158 let packet = match packet {
3159 Some(x) => x,
3160 None => return,
3161 };
3162 match &self.side {
3163 ConnectionSide::Client { .. } => {
3164 if space_id == SpaceId::Handshake {
3168 if let Some(hs) = self.state.as_handshake_mut() {
3169 hs.allow_server_migration = false;
3170 }
3171 }
3172 }
3173 ConnectionSide::Server { .. } => {
3174 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3175 {
3176 self.discard_space(now, SpaceId::Initial);
3178 }
3179 if self.zero_rtt_crypto.is_some() && is_1rtt {
3180 self.set_key_discard_timer(now, space_id)
3182 }
3183 }
3184 }
3185 let space = self.spaces[space_id].for_path(path_id);
3186 space.pending_acks.insert_one(packet, now);
3187 if packet >= space.rx_packet.unwrap_or_default() {
3188 space.rx_packet = Some(packet);
3189 self.spin = self.side.is_client() ^ spin;
3191 }
3192 }
3193
3194 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3199 if let Some(timeout) = self.idle_timeout {
3201 if self.state.is_closed() {
3202 self.timers
3203 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3204 } else {
3205 let dt = cmp::max(timeout, 3 * self.pto_max_path(space, false));
3206 self.timers.set(
3207 Timer::Conn(ConnTimer::Idle),
3208 now + dt,
3209 self.qlog.with_time(now),
3210 );
3211 }
3212 }
3213
3214 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3216 if self.state.is_closed() {
3217 self.timers.stop(
3218 Timer::PerPath(path_id, PathTimer::PathIdle),
3219 self.qlog.with_time(now),
3220 );
3221 } else {
3222 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3223 self.timers.set(
3224 Timer::PerPath(path_id, PathTimer::PathIdle),
3225 now + dt,
3226 self.qlog.with_time(now),
3227 );
3228 }
3229 }
3230 }
3231
3232 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3234 if !self.state.is_established() {
3235 return;
3236 }
3237
3238 if let Some(interval) = self.config.keep_alive_interval {
3239 self.timers.set(
3240 Timer::Conn(ConnTimer::KeepAlive),
3241 now + interval,
3242 self.qlog.with_time(now),
3243 );
3244 }
3245
3246 if let Some(interval) = self.path_data(path_id).keep_alive {
3247 self.timers.set(
3248 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3249 now + interval,
3250 self.qlog.with_time(now),
3251 );
3252 }
3253 }
3254
3255 fn reset_cid_retirement(&mut self, now: Instant) {
3257 if let Some((_path, t)) = self.next_cid_retirement() {
3258 self.timers.set(
3259 Timer::Conn(ConnTimer::PushNewCid),
3260 t,
3261 self.qlog.with_time(now),
3262 );
3263 }
3264 }
3265
3266 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3268 self.local_cid_state
3269 .iter()
3270 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3271 .min_by_key(|(_path_id, timeout)| *timeout)
3272 }
3273
3274 pub(crate) fn handle_first_packet(
3279 &mut self,
3280 now: Instant,
3281 network_path: FourTuple,
3282 ecn: Option<EcnCodepoint>,
3283 packet_number: u64,
3284 packet: InitialPacket,
3285 remaining: Option<BytesMut>,
3286 ) -> Result<(), ConnectionError> {
3287 let span = trace_span!("first recv");
3288 let _guard = span.enter();
3289 debug_assert!(self.side.is_server());
3290 let len = packet.header_data.len() + packet.payload.len();
3291 let path_id = PathId::ZERO;
3292 self.path_data_mut(path_id).total_recvd = len as u64;
3293
3294 if let Some(hs) = self.state.as_handshake_mut() {
3295 hs.expected_token = packet.header.token.clone();
3296 } else {
3297 unreachable!("first packet must be delivered in Handshake state");
3298 }
3299
3300 self.on_packet_authenticated(
3302 now,
3303 SpaceId::Initial,
3304 path_id,
3305 ecn,
3306 Some(packet_number),
3307 false,
3308 false,
3309 );
3310
3311 let packet: Packet = packet.into();
3312
3313 let mut qlog = QlogRecvPacket::new(len);
3314 qlog.header(&packet.header, Some(packet_number), path_id);
3315
3316 self.process_decrypted_packet(
3317 now,
3318 network_path,
3319 path_id,
3320 Some(packet_number),
3321 packet,
3322 &mut qlog,
3323 )?;
3324 self.qlog.emit_packet_received(qlog, now);
3325 if let Some(data) = remaining {
3326 self.handle_coalesced(now, network_path, path_id, ecn, data);
3327 }
3328
3329 self.qlog.emit_recovery_metrics(
3330 path_id,
3331 &mut self.paths.get_mut(&path_id).unwrap().data,
3332 now,
3333 );
3334
3335 Ok(())
3336 }
3337
3338 fn init_0rtt(&mut self, now: Instant) {
3339 let (header, packet) = match self.crypto.early_crypto() {
3340 Some(x) => x,
3341 None => return,
3342 };
3343 if self.side.is_client() {
3344 match self.crypto.transport_parameters() {
3345 Ok(params) => {
3346 let params = params
3347 .expect("crypto layer didn't supply transport parameters with ticket");
3348 let params = TransportParameters {
3350 initial_src_cid: None,
3351 original_dst_cid: None,
3352 preferred_address: None,
3353 retry_src_cid: None,
3354 stateless_reset_token: None,
3355 min_ack_delay: None,
3356 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3357 max_ack_delay: TransportParameters::default().max_ack_delay,
3358 initial_max_path_id: None,
3359 ..params
3360 };
3361 self.set_peer_params(params);
3362 self.qlog.emit_peer_transport_params_restored(self, now);
3363 }
3364 Err(e) => {
3365 error!("session ticket has malformed transport parameters: {}", e);
3366 return;
3367 }
3368 }
3369 }
3370 trace!("0-RTT enabled");
3371 self.zero_rtt_enabled = true;
3372 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3373 }
3374
3375 fn read_crypto(
3376 &mut self,
3377 space: SpaceId,
3378 crypto: &frame::Crypto,
3379 payload_len: usize,
3380 ) -> Result<(), TransportError> {
3381 let expected = if !self.state.is_handshake() {
3382 SpaceId::Data
3383 } else if self.highest_space == SpaceId::Initial {
3384 SpaceId::Initial
3385 } else {
3386 SpaceId::Handshake
3389 };
3390 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3394
3395 let end = crypto.offset + crypto.data.len() as u64;
3396 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3397 warn!(
3398 "received new {:?} CRYPTO data when expecting {:?}",
3399 space, expected
3400 );
3401 return Err(TransportError::PROTOCOL_VIOLATION(
3402 "new data at unexpected encryption level",
3403 ));
3404 }
3405
3406 let space = &mut self.spaces[space];
3407 let max = end.saturating_sub(space.crypto_stream.bytes_read());
3408 if max > self.config.crypto_buffer_size as u64 {
3409 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3410 }
3411
3412 space
3413 .crypto_stream
3414 .insert(crypto.offset, crypto.data.clone(), payload_len);
3415 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3416 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3417 if self.crypto.read_handshake(&chunk.bytes)? {
3418 self.events.push_back(Event::HandshakeDataReady);
3419 }
3420 }
3421
3422 Ok(())
3423 }
3424
3425 fn write_crypto(&mut self) {
3426 loop {
3427 let space = self.highest_space;
3428 let mut outgoing = Vec::new();
3429 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3430 match space {
3431 SpaceId::Initial => {
3432 self.upgrade_crypto(SpaceId::Handshake, crypto);
3433 }
3434 SpaceId::Handshake => {
3435 self.upgrade_crypto(SpaceId::Data, crypto);
3436 }
3437 _ => unreachable!("got updated secrets during 1-RTT"),
3438 }
3439 }
3440 if outgoing.is_empty() {
3441 if space == self.highest_space {
3442 break;
3443 } else {
3444 continue;
3446 }
3447 }
3448 let offset = self.spaces[space].crypto_offset;
3449 let outgoing = Bytes::from(outgoing);
3450 if let Some(hs) = self.state.as_handshake_mut() {
3451 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3452 hs.client_hello = Some(outgoing.clone());
3453 }
3454 }
3455 self.spaces[space].crypto_offset += outgoing.len() as u64;
3456 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3457 self.spaces[space].pending.crypto.push_back(frame::Crypto {
3458 offset,
3459 data: outgoing,
3460 });
3461 }
3462 }
3463
3464 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3466 debug_assert!(
3467 self.spaces[space].crypto.is_none(),
3468 "already reached packet space {space:?}"
3469 );
3470 trace!("{:?} keys ready", space);
3471 if space == SpaceId::Data {
3472 self.next_crypto = Some(
3474 self.crypto
3475 .next_1rtt_keys()
3476 .expect("handshake should be complete"),
3477 );
3478 }
3479
3480 self.spaces[space].crypto = Some(crypto);
3481 debug_assert!(space as usize > self.highest_space as usize);
3482 self.highest_space = space;
3483 if space == SpaceId::Data && self.side.is_client() {
3484 self.zero_rtt_crypto = None;
3486 }
3487 }
3488
3489 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3490 debug_assert!(space_id != SpaceId::Data);
3491 trace!("discarding {:?} keys", space_id);
3492 if space_id == SpaceId::Initial {
3493 if let ConnectionSide::Client { token, .. } = &mut self.side {
3495 *token = Bytes::new();
3496 }
3497 }
3498 let space = &mut self.spaces[space_id];
3499 space.crypto = None;
3500 let pns = space.for_path(PathId::ZERO);
3501 pns.time_of_last_ack_eliciting_packet = None;
3502 pns.loss_time = None;
3503 pns.loss_probes = 0;
3504 let sent_packets = mem::take(&mut pns.sent_packets);
3505 let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3506 for (_, packet) in sent_packets.into_iter() {
3507 path.data.remove_in_flight(&packet);
3508 }
3509
3510 self.set_loss_detection_timer(now, PathId::ZERO)
3511 }
3512
3513 fn handle_coalesced(
3514 &mut self,
3515 now: Instant,
3516 network_path: FourTuple,
3517 path_id: PathId,
3518 ecn: Option<EcnCodepoint>,
3519 data: BytesMut,
3520 ) {
3521 self.path_data_mut(path_id)
3522 .inc_total_recvd(data.len() as u64);
3523 let mut remaining = Some(data);
3524 let cid_len = self
3525 .local_cid_state
3526 .values()
3527 .map(|cid_state| cid_state.cid_len())
3528 .next()
3529 .expect("one cid_state must exist");
3530 while let Some(data) = remaining {
3531 match PartialDecode::new(
3532 data,
3533 &FixedLengthConnectionIdParser::new(cid_len),
3534 &[self.version],
3535 self.endpoint_config.grease_quic_bit,
3536 ) {
3537 Ok((partial_decode, rest)) => {
3538 remaining = rest;
3539 self.handle_decode(now, network_path, path_id, ecn, partial_decode);
3540 }
3541 Err(e) => {
3542 trace!("malformed header: {}", e);
3543 return;
3544 }
3545 }
3546 }
3547 }
3548
3549 fn handle_decode(
3550 &mut self,
3551 now: Instant,
3552 network_path: FourTuple,
3553 path_id: PathId,
3554 ecn: Option<EcnCodepoint>,
3555 partial_decode: PartialDecode,
3556 ) {
3557 let qlog = QlogRecvPacket::new(partial_decode.len());
3558 if let Some(decoded) = packet_crypto::unprotect_header(
3559 partial_decode,
3560 &self.spaces,
3561 self.zero_rtt_crypto.as_ref(),
3562 self.peer_params.stateless_reset_token,
3563 ) {
3564 self.handle_packet(
3565 now,
3566 network_path,
3567 path_id,
3568 ecn,
3569 decoded.packet,
3570 decoded.stateless_reset,
3571 qlog,
3572 );
3573 }
3574 }
3575
3576 fn handle_packet(
3577 &mut self,
3578 now: Instant,
3579 network_path: FourTuple,
3580 path_id: PathId,
3581 ecn: Option<EcnCodepoint>,
3582 packet: Option<Packet>,
3583 stateless_reset: bool,
3584 mut qlog: QlogRecvPacket,
3585 ) {
3586 self.stats.udp_rx.ios += 1;
3587 if let Some(ref packet) = packet {
3588 trace!(
3589 "got {:?} packet ({} bytes) from {} using id {}",
3590 packet.header.space(),
3591 packet.payload.len() + packet.header_data.len(),
3592 network_path,
3593 packet.header.dst_cid(),
3594 );
3595 }
3596
3597 if self.is_handshaking() {
3598 if path_id != PathId::ZERO {
3599 debug!(%network_path, %path_id, "discarding multipath packet during handshake");
3600 return;
3601 }
3602 if network_path != self.path_data_mut(path_id).network_path {
3603 if let Some(hs) = self.state.as_handshake() {
3604 if hs.allow_server_migration {
3605 trace!(%network_path, prev = %self.path_data(path_id).network_path, "server migrated to new remote");
3606 self.path_data_mut(path_id).network_path = network_path;
3607 self.qlog.emit_tuple_assigned(path_id, network_path, now);
3608 } else {
3609 debug!("discarding packet with unexpected remote during handshake");
3610 return;
3611 }
3612 } else {
3613 debug!("discarding packet with unexpected remote during handshake");
3614 return;
3615 }
3616 }
3617 }
3618
3619 let was_closed = self.state.is_closed();
3620 let was_drained = self.state.is_drained();
3621
3622 let decrypted = match packet {
3623 None => Err(None),
3624 Some(mut packet) => self
3625 .decrypt_packet(now, path_id, &mut packet)
3626 .map(move |number| (packet, number)),
3627 };
3628 let result = match decrypted {
3629 _ if stateless_reset => {
3630 debug!("got stateless reset");
3631 Err(ConnectionError::Reset)
3632 }
3633 Err(Some(e)) => {
3634 warn!("illegal packet: {}", e);
3635 Err(e.into())
3636 }
3637 Err(None) => {
3638 debug!("failed to authenticate packet");
3639 self.authentication_failures += 1;
3640 let integrity_limit = self.spaces[self.highest_space]
3641 .crypto
3642 .as_ref()
3643 .unwrap()
3644 .packet
3645 .local
3646 .integrity_limit();
3647 if self.authentication_failures > integrity_limit {
3648 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3649 } else {
3650 return;
3651 }
3652 }
3653 Ok((packet, number)) => {
3654 qlog.header(&packet.header, number, path_id);
3655 let span = match number {
3656 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3657 None => trace_span!("recv", space = ?packet.header.space()),
3658 };
3659 let _guard = span.enter();
3660
3661 let dedup = self.spaces[packet.header.space()]
3662 .path_space_mut(path_id)
3663 .map(|pns| &mut pns.dedup);
3664 if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3665 debug!("discarding possible duplicate packet");
3666 self.qlog.emit_packet_received(qlog, now);
3667 return;
3668 } else if self.state.is_handshake() && packet.header.is_short() {
3669 trace!("dropping short packet during handshake");
3671 self.qlog.emit_packet_received(qlog, now);
3672 return;
3673 } else {
3674 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3675 if let Some(hs) = self.state.as_handshake() {
3676 if self.side.is_server() && token != &hs.expected_token {
3677 warn!("discarding Initial with invalid retry token");
3681 self.qlog.emit_packet_received(qlog, now);
3682 return;
3683 }
3684 }
3685 }
3686
3687 if !self.state.is_closed() {
3688 let spin = match packet.header {
3689 Header::Short { spin, .. } => spin,
3690 _ => false,
3691 };
3692
3693 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3694 self.ensure_path(path_id, network_path, now, number);
3696 }
3697 if self.paths.contains_key(&path_id) {
3698 self.on_packet_authenticated(
3699 now,
3700 packet.header.space(),
3701 path_id,
3702 ecn,
3703 number,
3704 spin,
3705 packet.header.is_1rtt(),
3706 );
3707 }
3708 }
3709
3710 let res = self.process_decrypted_packet(
3711 now,
3712 network_path,
3713 path_id,
3714 number,
3715 packet,
3716 &mut qlog,
3717 );
3718
3719 self.qlog.emit_packet_received(qlog, now);
3720 res
3721 }
3722 }
3723 };
3724
3725 if let Err(conn_err) = result {
3727 match conn_err {
3728 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3729 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3730 ConnectionError::Reset
3731 | ConnectionError::TransportError(TransportError {
3732 code: TransportErrorCode::AEAD_LIMIT_REACHED,
3733 ..
3734 }) => {
3735 self.state.move_to_drained(Some(conn_err));
3736 }
3737 ConnectionError::TimedOut => {
3738 unreachable!("timeouts aren't generated by packet processing");
3739 }
3740 ConnectionError::TransportError(err) => {
3741 debug!("closing connection due to transport error: {}", err);
3742 self.state.move_to_closed(err);
3743 }
3744 ConnectionError::VersionMismatch => {
3745 self.state.move_to_draining(Some(conn_err));
3746 }
3747 ConnectionError::LocallyClosed => {
3748 unreachable!("LocallyClosed isn't generated by packet processing");
3749 }
3750 ConnectionError::CidsExhausted => {
3751 unreachable!("CidsExhausted isn't generated by packet processing");
3752 }
3753 };
3754 }
3755
3756 if !was_closed && self.state.is_closed() {
3757 self.close_common();
3758 if !self.state.is_drained() {
3759 self.set_close_timer(now);
3760 }
3761 }
3762 if !was_drained && self.state.is_drained() {
3763 self.endpoint_events.push_back(EndpointEventInner::Drained);
3764 self.timers
3767 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3768 }
3769
3770 if matches!(self.state.as_type(), StateType::Closed) {
3772 let path_remote = self
3776 .paths
3777 .get(&path_id)
3778 .map(|p| p.data.network_path)
3779 .unwrap_or(network_path);
3780 self.close = network_path == path_remote;
3781 }
3782 }
3783
3784 fn process_decrypted_packet(
3785 &mut self,
3786 now: Instant,
3787 network_path: FourTuple,
3788 path_id: PathId,
3789 number: Option<u64>,
3790 packet: Packet,
3791 qlog: &mut QlogRecvPacket,
3792 ) -> Result<(), ConnectionError> {
3793 if !self.paths.contains_key(&path_id) {
3794 trace!(%path_id, ?number, "discarding packet for unknown path");
3798 return Ok(());
3799 }
3800 let state = match self.state.as_type() {
3801 StateType::Established => {
3802 match packet.header.space() {
3803 SpaceId::Data => self.process_payload(
3804 now,
3805 network_path,
3806 path_id,
3807 number.unwrap(),
3808 packet,
3809 qlog,
3810 )?,
3811 _ if packet.header.has_frames() => {
3812 self.process_early_payload(now, path_id, packet, qlog)?
3813 }
3814 _ => {
3815 trace!("discarding unexpected pre-handshake packet");
3816 }
3817 }
3818 return Ok(());
3819 }
3820 StateType::Closed => {
3821 for result in frame::Iter::new(packet.payload.freeze())? {
3822 let frame = match result {
3823 Ok(frame) => frame,
3824 Err(err) => {
3825 debug!("frame decoding error: {err:?}");
3826 continue;
3827 }
3828 };
3829 qlog.frame(&frame);
3830
3831 if let Frame::Padding = frame {
3832 continue;
3833 };
3834
3835 self.stats.frame_rx.record(frame.ty());
3836
3837 if let Frame::Close(_error) = frame {
3838 self.state.move_to_draining(None);
3839 break;
3840 }
3841 }
3842 return Ok(());
3843 }
3844 StateType::Draining | StateType::Drained => return Ok(()),
3845 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3846 };
3847
3848 match packet.header {
3849 Header::Retry {
3850 src_cid: rem_cid, ..
3851 } => {
3852 debug_assert_eq!(path_id, PathId::ZERO);
3853 if self.side.is_server() {
3854 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3855 }
3856
3857 let is_valid_retry = self
3858 .rem_cids
3859 .get(&path_id)
3860 .map(|cids| cids.active())
3861 .map(|orig_dst_cid| {
3862 self.crypto.is_valid_retry(
3863 orig_dst_cid,
3864 &packet.header_data,
3865 &packet.payload,
3866 )
3867 })
3868 .unwrap_or_default();
3869 if self.total_authed_packets > 1
3870 || packet.payload.len() <= 16 || !is_valid_retry
3872 {
3873 trace!("discarding invalid Retry");
3874 return Ok(());
3882 }
3883
3884 trace!("retrying with CID {}", rem_cid);
3885 let client_hello = state.client_hello.take().unwrap();
3886 self.retry_src_cid = Some(rem_cid);
3887 self.rem_cids
3888 .get_mut(&path_id)
3889 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3890 .update_initial_cid(rem_cid);
3891 self.rem_handshake_cid = rem_cid;
3892
3893 let space = &mut self.spaces[SpaceId::Initial];
3894 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3895 self.on_packet_acked(now, PathId::ZERO, info);
3896 };
3897
3898 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = {
3901 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3902 space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3903 space.crypto_offset = client_hello.len() as u64;
3904 space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3905 .for_path(path_id)
3906 .next_packet_number;
3907 space.pending.crypto.push_back(frame::Crypto {
3908 offset: 0,
3909 data: client_hello,
3910 });
3911 space
3912 };
3913
3914 let zero_rtt = mem::take(
3916 &mut self.spaces[SpaceId::Data]
3917 .for_path(PathId::ZERO)
3918 .sent_packets,
3919 );
3920 for (_, info) in zero_rtt.into_iter() {
3921 self.paths
3922 .get_mut(&PathId::ZERO)
3923 .unwrap()
3924 .remove_in_flight(&info);
3925 self.spaces[SpaceId::Data].pending |= info.retransmits;
3926 }
3927 self.streams.retransmit_all_for_0rtt();
3928
3929 let token_len = packet.payload.len() - 16;
3930 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3931 unreachable!("we already short-circuited if we're server");
3932 };
3933 *token = packet.payload.freeze().split_to(token_len);
3934
3935 self.state = State::handshake(state::Handshake {
3936 expected_token: Bytes::new(),
3937 rem_cid_set: false,
3938 client_hello: None,
3939 allow_server_migration: true,
3940 });
3941 Ok(())
3942 }
3943 Header::Long {
3944 ty: LongType::Handshake,
3945 src_cid: rem_cid,
3946 dst_cid: loc_cid,
3947 ..
3948 } => {
3949 debug_assert_eq!(path_id, PathId::ZERO);
3950 if rem_cid != self.rem_handshake_cid {
3951 debug!(
3952 "discarding packet with mismatched remote CID: {} != {}",
3953 self.rem_handshake_cid, rem_cid
3954 );
3955 return Ok(());
3956 }
3957 self.on_path_validated(path_id);
3958
3959 self.process_early_payload(now, path_id, packet, qlog)?;
3960 if self.state.is_closed() {
3961 return Ok(());
3962 }
3963
3964 if self.crypto.is_handshaking() {
3965 trace!("handshake ongoing");
3966 return Ok(());
3967 }
3968
3969 if self.side.is_client() {
3970 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3972 TransportError::new(
3973 TransportErrorCode::crypto(0x6d),
3974 "transport parameters missing".to_owned(),
3975 )
3976 })?;
3977
3978 if self.has_0rtt() {
3979 if !self.crypto.early_data_accepted().unwrap() {
3980 debug_assert!(self.side.is_client());
3981 debug!("0-RTT rejected");
3982 self.accepted_0rtt = false;
3983 self.streams.zero_rtt_rejected();
3984
3985 self.spaces[SpaceId::Data].pending = Retransmits::default();
3987
3988 let sent_packets = mem::take(
3990 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3991 );
3992 for (_, packet) in sent_packets.into_iter() {
3993 self.paths
3994 .get_mut(&path_id)
3995 .unwrap()
3996 .remove_in_flight(&packet);
3997 }
3998 } else {
3999 self.accepted_0rtt = true;
4000 params.validate_resumption_from(&self.peer_params)?;
4001 }
4002 }
4003 if let Some(token) = params.stateless_reset_token {
4004 let remote = self.path_data(path_id).network_path.remote;
4006 self.endpoint_events
4007 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4008 }
4009 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4010 self.issue_first_cids(now);
4011 } else {
4012 self.spaces[SpaceId::Data].pending.handshake_done = true;
4014 self.discard_space(now, SpaceId::Handshake);
4015 self.events.push_back(Event::HandshakeConfirmed);
4016 trace!("handshake confirmed");
4017 }
4018
4019 self.events.push_back(Event::Connected);
4020 self.state.move_to_established();
4021 trace!("established");
4022
4023 self.issue_first_path_cids(now);
4026 Ok(())
4027 }
4028 Header::Initial(InitialHeader {
4029 src_cid: rem_cid,
4030 dst_cid: loc_cid,
4031 ..
4032 }) => {
4033 debug_assert_eq!(path_id, PathId::ZERO);
4034 if !state.rem_cid_set {
4035 trace!("switching remote CID to {}", rem_cid);
4036 let mut state = state.clone();
4037 self.rem_cids
4038 .get_mut(&path_id)
4039 .expect("PathId::ZERO not yet abandoned")
4040 .update_initial_cid(rem_cid);
4041 self.rem_handshake_cid = rem_cid;
4042 self.orig_rem_cid = rem_cid;
4043 state.rem_cid_set = true;
4044 self.state.move_to_handshake(state);
4045 } else if rem_cid != self.rem_handshake_cid {
4046 debug!(
4047 "discarding packet with mismatched remote CID: {} != {}",
4048 self.rem_handshake_cid, rem_cid
4049 );
4050 return Ok(());
4051 }
4052
4053 let starting_space = self.highest_space;
4054 self.process_early_payload(now, path_id, packet, qlog)?;
4055
4056 if self.side.is_server()
4057 && starting_space == SpaceId::Initial
4058 && self.highest_space != SpaceId::Initial
4059 {
4060 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4061 TransportError::new(
4062 TransportErrorCode::crypto(0x6d),
4063 "transport parameters missing".to_owned(),
4064 )
4065 })?;
4066 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4067 self.issue_first_cids(now);
4068 self.init_0rtt(now);
4069 }
4070 Ok(())
4071 }
4072 Header::Long {
4073 ty: LongType::ZeroRtt,
4074 ..
4075 } => {
4076 self.process_payload(now, network_path, path_id, number.unwrap(), packet, qlog)?;
4077 Ok(())
4078 }
4079 Header::VersionNegotiate { .. } => {
4080 if self.total_authed_packets > 1 {
4081 return Ok(());
4082 }
4083 let supported = packet
4084 .payload
4085 .chunks(4)
4086 .any(|x| match <[u8; 4]>::try_from(x) {
4087 Ok(version) => self.version == u32::from_be_bytes(version),
4088 Err(_) => false,
4089 });
4090 if supported {
4091 return Ok(());
4092 }
4093 debug!("remote doesn't support our version");
4094 Err(ConnectionError::VersionMismatch)
4095 }
4096 Header::Short { .. } => unreachable!(
4097 "short packets received during handshake are discarded in handle_packet"
4098 ),
4099 }
4100 }
4101
4102 fn process_early_payload(
4104 &mut self,
4105 now: Instant,
4106 path_id: PathId,
4107 packet: Packet,
4108 #[allow(unused)] qlog: &mut QlogRecvPacket,
4109 ) -> Result<(), TransportError> {
4110 debug_assert_ne!(packet.header.space(), SpaceId::Data);
4111 debug_assert_eq!(path_id, PathId::ZERO);
4112 let payload_len = packet.payload.len();
4113 let mut ack_eliciting = false;
4114 for result in frame::Iter::new(packet.payload.freeze())? {
4115 let frame = result?;
4116 qlog.frame(&frame);
4117 let span = match frame {
4118 Frame::Padding => continue,
4119 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4120 };
4121
4122 self.stats.frame_rx.record(frame.ty());
4123
4124 let _guard = span.as_ref().map(|x| x.enter());
4125 ack_eliciting |= frame.is_ack_eliciting();
4126
4127 if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4129 return Err(TransportError::PROTOCOL_VIOLATION(
4130 "illegal frame type in handshake",
4131 ));
4132 }
4133
4134 match frame {
4135 Frame::Padding | Frame::Ping => {}
4136 Frame::Crypto(frame) => {
4137 self.read_crypto(packet.header.space(), &frame, payload_len)?;
4138 }
4139 Frame::Ack(ack) => {
4140 self.on_ack_received(now, packet.header.space(), ack)?;
4141 }
4142 Frame::PathAck(ack) => {
4143 span.as_ref()
4144 .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4145 self.on_path_ack_received(now, packet.header.space(), ack)?;
4146 }
4147 Frame::Close(reason) => {
4148 self.state.move_to_draining(Some(reason.into()));
4149 return Ok(());
4150 }
4151 _ => {
4152 let mut err =
4153 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4154 err.frame = frame::MaybeFrame::Known(frame.ty());
4155 return Err(err);
4156 }
4157 }
4158 }
4159
4160 if ack_eliciting {
4161 self.spaces[packet.header.space()]
4163 .for_path(path_id)
4164 .pending_acks
4165 .set_immediate_ack_required();
4166 }
4167
4168 self.write_crypto();
4169 Ok(())
4170 }
4171
4172 fn process_payload(
4174 &mut self,
4175 now: Instant,
4176 network_path: FourTuple,
4177 path_id: PathId,
4178 number: u64,
4179 packet: Packet,
4180 #[allow(unused)] qlog: &mut QlogRecvPacket,
4181 ) -> Result<(), TransportError> {
4182 let payload = packet.payload.freeze();
4183 let mut is_probing_packet = true;
4184 let mut close = None;
4185 let payload_len = payload.len();
4186 let mut ack_eliciting = false;
4187 let mut migration_observed_addr = None;
4190 for result in frame::Iter::new(payload)? {
4191 let frame = result?;
4192 qlog.frame(&frame);
4193 let span = match frame {
4194 Frame::Padding => continue,
4195 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4196 };
4197
4198 self.stats.frame_rx.record(frame.ty());
4199 match &frame {
4202 Frame::Crypto(f) => {
4203 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4204 }
4205 Frame::Stream(f) => {
4206 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4207 }
4208 Frame::Datagram(f) => {
4209 trace!(len = f.data.len(), "got datagram frame");
4210 }
4211 f => {
4212 trace!("got frame {f}");
4213 }
4214 }
4215
4216 let _guard = span.enter();
4217 if packet.header.is_0rtt() {
4218 match frame {
4219 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4220 return Err(TransportError::PROTOCOL_VIOLATION(
4221 "illegal frame type in 0-RTT",
4222 ));
4223 }
4224 _ => {
4225 if frame.is_1rtt() {
4226 return Err(TransportError::PROTOCOL_VIOLATION(
4227 "illegal frame type in 0-RTT",
4228 ));
4229 }
4230 }
4231 }
4232 }
4233 ack_eliciting |= frame.is_ack_eliciting();
4234
4235 match frame {
4237 Frame::Padding
4238 | Frame::PathChallenge(_)
4239 | Frame::PathResponse(_)
4240 | Frame::NewConnectionId(_)
4241 | Frame::ObservedAddr(_) => {}
4242 _ => {
4243 is_probing_packet = false;
4244 }
4245 }
4246
4247 match frame {
4248 Frame::Crypto(frame) => {
4249 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4250 }
4251 Frame::Stream(frame) => {
4252 if self.streams.received(frame, payload_len)?.should_transmit() {
4253 self.spaces[SpaceId::Data].pending.max_data = true;
4254 }
4255 }
4256 Frame::Ack(ack) => {
4257 self.on_ack_received(now, SpaceId::Data, ack)?;
4258 }
4259 Frame::PathAck(ack) => {
4260 span.record("path", tracing::field::debug(&ack.path_id));
4261 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4262 }
4263 Frame::Padding | Frame::Ping => {}
4264 Frame::Close(reason) => {
4265 close = Some(reason);
4266 }
4267 Frame::PathChallenge(challenge) => {
4268 let path = &mut self
4269 .path_mut(path_id)
4270 .expect("payload is processed only after the path becomes known");
4271 path.path_responses.push(number, challenge.0, network_path);
4272 if network_path == path.network_path {
4275 match self.peer_supports_ack_frequency() {
4285 true => self.immediate_ack(path_id),
4286 false => {
4287 self.ping_path(path_id).ok();
4288 }
4289 }
4290 }
4291 }
4292 Frame::PathResponse(response) => {
4293 let path = self
4294 .paths
4295 .get_mut(&path_id)
4296 .expect("payload is processed only after the path becomes known");
4297
4298 use PathTimer::*;
4299 use paths::OnPathResponseReceived::*;
4300 match path
4301 .data
4302 .on_path_response_received(now, response.0, network_path)
4303 {
4304 OnPath { was_open } => {
4305 let qlog = self.qlog.with_time(now);
4306
4307 self.timers
4308 .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4309 self.timers
4310 .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4311
4312 let next_challenge = path
4313 .data
4314 .earliest_expiring_challenge()
4315 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4316 self.timers.set_or_stop(
4317 Timer::PerPath(path_id, PathChallengeLost),
4318 next_challenge,
4319 qlog,
4320 );
4321
4322 if !was_open {
4323 self.events
4324 .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4325 if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4326 {
4327 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4328 id: path_id,
4329 addr: observed.socket_addr(),
4330 }));
4331 }
4332 }
4333 if let Some((_, ref mut prev)) = path.prev {
4334 prev.challenges_sent.clear();
4335 prev.send_new_challenge = false;
4336 }
4337 }
4338 OffPath => {
4339 debug!("Response to off-path PathChallenge!");
4340 let next_challenge = path
4341 .data
4342 .earliest_expiring_challenge()
4343 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4344 self.timers.set_or_stop(
4345 Timer::PerPath(path_id, PathChallengeLost),
4346 next_challenge,
4347 self.qlog.with_time(now),
4348 );
4349 }
4350 Invalid { expected } => {
4351 debug!(%response, %network_path, %expected, "ignoring invalid PATH_RESPONSE")
4352 }
4353 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4354 }
4355 }
4356 Frame::MaxData(frame::MaxData(bytes)) => {
4357 self.streams.received_max_data(bytes);
4358 }
4359 Frame::MaxStreamData(frame::MaxStreamData { id, offset }) => {
4360 self.streams.received_max_stream_data(id, offset)?;
4361 }
4362 Frame::MaxStreams(frame::MaxStreams { dir, count }) => {
4363 self.streams.received_max_streams(dir, count)?;
4364 }
4365 Frame::ResetStream(frame) => {
4366 if self.streams.received_reset(frame)?.should_transmit() {
4367 self.spaces[SpaceId::Data].pending.max_data = true;
4368 }
4369 }
4370 Frame::DataBlocked { offset } => {
4371 debug!(offset, "peer claims to be blocked at connection level");
4372 }
4373 Frame::StreamDataBlocked { id, offset } => {
4374 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4375 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4376 return Err(TransportError::STREAM_STATE_ERROR(
4377 "STREAM_DATA_BLOCKED on send-only stream",
4378 ));
4379 }
4380 debug!(
4381 stream = %id,
4382 offset, "peer claims to be blocked at stream level"
4383 );
4384 }
4385 Frame::StreamsBlocked { dir, limit } => {
4386 if limit > MAX_STREAM_COUNT {
4387 return Err(TransportError::FRAME_ENCODING_ERROR(
4388 "unrepresentable stream limit",
4389 ));
4390 }
4391 debug!(
4392 "peer claims to be blocked opening more than {} {} streams",
4393 limit, dir
4394 );
4395 }
4396 Frame::StopSending(frame::StopSending { id, error_code }) => {
4397 if id.initiator() != self.side.side() {
4398 if id.dir() == Dir::Uni {
4399 debug!("got STOP_SENDING on recv-only {}", id);
4400 return Err(TransportError::STREAM_STATE_ERROR(
4401 "STOP_SENDING on recv-only stream",
4402 ));
4403 }
4404 } else if self.streams.is_local_unopened(id) {
4405 return Err(TransportError::STREAM_STATE_ERROR(
4406 "STOP_SENDING on unopened stream",
4407 ));
4408 }
4409 self.streams.received_stop_sending(id, error_code);
4410 }
4411 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4412 if let Some(ref path_id) = path_id {
4413 span.record("path", tracing::field::debug(&path_id));
4414 }
4415 let path_id = path_id.unwrap_or_default();
4416 match self.local_cid_state.get_mut(&path_id) {
4417 None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4418 Some(cid_state) => {
4419 let allow_more_cids = cid_state
4420 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4421
4422 let has_path = !self.abandoned_paths.contains(&path_id);
4426 let allow_more_cids = allow_more_cids && has_path;
4427
4428 self.endpoint_events
4429 .push_back(EndpointEventInner::RetireConnectionId(
4430 now,
4431 path_id,
4432 sequence,
4433 allow_more_cids,
4434 ));
4435 }
4436 }
4437 }
4438 Frame::NewConnectionId(frame) => {
4439 let path_id = if let Some(path_id) = frame.path_id {
4440 if !self.is_multipath_negotiated() {
4441 return Err(TransportError::PROTOCOL_VIOLATION(
4442 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4443 ));
4444 }
4445 if path_id > self.local_max_path_id {
4446 return Err(TransportError::PROTOCOL_VIOLATION(
4447 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4448 ));
4449 }
4450 path_id
4451 } else {
4452 PathId::ZERO
4453 };
4454
4455 if self.abandoned_paths.contains(&path_id) {
4456 trace!("ignoring issued CID for abandoned path");
4457 continue;
4458 }
4459 if let Some(ref path_id) = frame.path_id {
4460 span.record("path", tracing::field::debug(&path_id));
4461 }
4462 let rem_cids = self
4463 .rem_cids
4464 .entry(path_id)
4465 .or_insert_with(|| CidQueue::new(frame.id));
4466 if rem_cids.active().is_empty() {
4467 return Err(TransportError::PROTOCOL_VIOLATION(
4468 "NEW_CONNECTION_ID when CIDs aren't in use",
4469 ));
4470 }
4471 if frame.retire_prior_to > frame.sequence {
4472 return Err(TransportError::PROTOCOL_VIOLATION(
4473 "NEW_CONNECTION_ID retiring unissued CIDs",
4474 ));
4475 }
4476
4477 use crate::cid_queue::InsertError;
4478 match rem_cids.insert(frame) {
4479 Ok(None) if self.path(path_id).is_none() => {
4480 self.continue_nat_traversal_round(now);
4483 }
4484 Ok(None) => {}
4485 Ok(Some((retired, reset_token))) => {
4486 let pending_retired =
4487 &mut self.spaces[SpaceId::Data].pending.retire_cids;
4488 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4491 if (pending_retired.len() as u64)
4494 .saturating_add(retired.end.saturating_sub(retired.start))
4495 > MAX_PENDING_RETIRED_CIDS
4496 {
4497 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4498 "queued too many retired CIDs",
4499 ));
4500 }
4501 pending_retired.extend(retired.map(|seq| (path_id, seq)));
4502 self.set_reset_token(path_id, network_path.remote, reset_token);
4504 }
4505 Err(InsertError::ExceedsLimit) => {
4506 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4507 }
4508 Err(InsertError::Retired) => {
4509 trace!("discarding already-retired");
4510 self.spaces[SpaceId::Data]
4514 .pending
4515 .retire_cids
4516 .push((path_id, frame.sequence));
4517 continue;
4518 }
4519 };
4520
4521 if self.side.is_server()
4522 && path_id == PathId::ZERO
4523 && self
4524 .rem_cids
4525 .get(&PathId::ZERO)
4526 .map(|cids| cids.active_seq() == 0)
4527 .unwrap_or_default()
4528 {
4529 self.update_rem_cid(PathId::ZERO);
4532 }
4533 }
4534 Frame::NewToken(NewToken { token }) => {
4535 let ConnectionSide::Client {
4536 token_store,
4537 server_name,
4538 ..
4539 } = &self.side
4540 else {
4541 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4542 };
4543 if token.is_empty() {
4544 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4545 }
4546 trace!("got new token");
4547 token_store.insert(server_name, token);
4548 }
4549 Frame::Datagram(datagram) => {
4550 if self
4551 .datagrams
4552 .received(datagram, &self.config.datagram_receive_buffer_size)?
4553 {
4554 self.events.push_back(Event::DatagramReceived);
4555 }
4556 }
4557 Frame::AckFrequency(ack_frequency) => {
4558 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4561 continue;
4564 }
4565
4566 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4568 space.pending_acks.set_ack_frequency_params(&ack_frequency);
4569
4570 if let Some(timeout) = space
4573 .pending_acks
4574 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4575 {
4576 self.timers.set(
4577 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4578 timeout,
4579 self.qlog.with_time(now),
4580 );
4581 }
4582 }
4583 }
4584 Frame::ImmediateAck => {
4585 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4587 pns.pending_acks.set_immediate_ack_required();
4588 }
4589 }
4590 Frame::HandshakeDone => {
4591 if self.side.is_server() {
4592 return Err(TransportError::PROTOCOL_VIOLATION(
4593 "client sent HANDSHAKE_DONE",
4594 ));
4595 }
4596 if self.spaces[SpaceId::Handshake].crypto.is_some() {
4597 self.discard_space(now, SpaceId::Handshake);
4598 }
4599 self.events.push_back(Event::HandshakeConfirmed);
4600 trace!("handshake confirmed");
4601 }
4602 Frame::ObservedAddr(observed) => {
4603 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4605 if !self
4606 .peer_params
4607 .address_discovery_role
4608 .should_report(&self.config.address_discovery_role)
4609 {
4610 return Err(TransportError::PROTOCOL_VIOLATION(
4611 "received OBSERVED_ADDRESS frame when not negotiated",
4612 ));
4613 }
4614 if packet.header.space() != SpaceId::Data {
4616 return Err(TransportError::PROTOCOL_VIOLATION(
4617 "OBSERVED_ADDRESS frame outside data space",
4618 ));
4619 }
4620
4621 let path = self.path_data_mut(path_id);
4622 if network_path == path.network_path {
4623 if let Some(updated) = path.update_observed_addr_report(observed) {
4624 if path.open {
4625 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4626 id: path_id,
4627 addr: updated,
4628 }));
4629 }
4630 }
4632 } else {
4633 migration_observed_addr = Some(observed)
4635 }
4636 }
4637 Frame::PathAbandon(frame::PathAbandon {
4638 path_id,
4639 error_code,
4640 }) => {
4641 span.record("path", tracing::field::debug(&path_id));
4642 let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4644 Ok(()) => {
4645 trace!("peer abandoned path");
4646 false
4647 }
4648 Err(ClosePathError::LastOpenPath) => {
4649 trace!("peer abandoned last path, closing connection");
4650 return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4652 }
4653 Err(ClosePathError::ClosedPath) => {
4654 trace!("peer abandoned already closed path");
4655 true
4656 }
4657 };
4658 if self.path(path_id).is_some() && !already_abandoned {
4663 let delay = self.pto(SpaceId::Data, path_id) * 3;
4668 self.timers.set(
4669 Timer::PerPath(path_id, PathTimer::DiscardPath),
4670 now + delay,
4671 self.qlog.with_time(now),
4672 );
4673 }
4674 }
4675 Frame::PathStatusAvailable(info) => {
4676 span.record("path", tracing::field::debug(&info.path_id));
4677 if self.is_multipath_negotiated() {
4678 self.on_path_status(
4679 info.path_id,
4680 PathStatus::Available,
4681 info.status_seq_no,
4682 );
4683 } else {
4684 return Err(TransportError::PROTOCOL_VIOLATION(
4685 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4686 ));
4687 }
4688 }
4689 Frame::PathStatusBackup(info) => {
4690 span.record("path", tracing::field::debug(&info.path_id));
4691 if self.is_multipath_negotiated() {
4692 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4693 } else {
4694 return Err(TransportError::PROTOCOL_VIOLATION(
4695 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4696 ));
4697 }
4698 }
4699 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4700 span.record("path", tracing::field::debug(&path_id));
4701 if !self.is_multipath_negotiated() {
4702 return Err(TransportError::PROTOCOL_VIOLATION(
4703 "received MAX_PATH_ID frame when multipath was not negotiated",
4704 ));
4705 }
4706 if path_id > self.remote_max_path_id {
4708 self.remote_max_path_id = path_id;
4709 self.issue_first_path_cids(now);
4710 while let Some(true) = self.continue_nat_traversal_round(now) {}
4711 }
4712 }
4713 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4714 if self.is_multipath_negotiated() {
4718 if max_path_id > self.local_max_path_id {
4719 return Err(TransportError::PROTOCOL_VIOLATION(
4720 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4721 ));
4722 }
4723 debug!("received PATHS_BLOCKED({:?})", max_path_id);
4724 } else {
4726 return Err(TransportError::PROTOCOL_VIOLATION(
4727 "received PATHS_BLOCKED frame when not multipath was not negotiated",
4728 ));
4729 }
4730 }
4731 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4732 if self.is_multipath_negotiated() {
4740 if path_id > self.local_max_path_id {
4741 return Err(TransportError::PROTOCOL_VIOLATION(
4742 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4743 ));
4744 }
4745 if next_seq.0
4746 > self
4747 .local_cid_state
4748 .get(&path_id)
4749 .map(|cid_state| cid_state.active_seq().1 + 1)
4750 .unwrap_or_default()
4751 {
4752 return Err(TransportError::PROTOCOL_VIOLATION(
4753 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4754 ));
4755 }
4756 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4757 } else {
4758 return Err(TransportError::PROTOCOL_VIOLATION(
4759 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4760 ));
4761 }
4762 }
4763 Frame::AddAddress(addr) => {
4764 let client_state = match self.iroh_hp.client_side_mut() {
4765 Ok(state) => state,
4766 Err(err) => {
4767 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4768 "Nat traversal(ADD_ADDRESS): {err}"
4769 )));
4770 }
4771 };
4772
4773 if !client_state.check_remote_address(&addr) {
4774 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4776 }
4777
4778 match client_state.add_remote_address(addr) {
4779 Ok(maybe_added) => {
4780 if let Some(added) = maybe_added {
4781 self.events.push_back(Event::NatTraversal(
4782 iroh_hp::Event::AddressAdded(added),
4783 ));
4784 }
4785 }
4786 Err(e) => {
4787 warn!(%e, "failed to add remote address")
4788 }
4789 }
4790 }
4791 Frame::RemoveAddress(addr) => {
4792 let client_state = match self.iroh_hp.client_side_mut() {
4793 Ok(state) => state,
4794 Err(err) => {
4795 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4796 "Nat traversal(REMOVE_ADDRESS): {err}"
4797 )));
4798 }
4799 };
4800 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4801 self.events
4802 .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4803 removed_addr,
4804 )));
4805 }
4806 }
4807 Frame::ReachOut(reach_out) => {
4808 let server_state = match self.iroh_hp.server_side_mut() {
4809 Ok(state) => state,
4810 Err(err) => {
4811 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4812 "Nat traversal(REACH_OUT): {err}"
4813 )));
4814 }
4815 };
4816
4817 if let Err(err) = server_state.handle_reach_out(reach_out) {
4818 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4819 "Nat traversal(REACH_OUT): {err}"
4820 )));
4821 }
4822 }
4823 }
4824 }
4825
4826 let space = self.spaces[SpaceId::Data].for_path(path_id);
4827 if space
4828 .pending_acks
4829 .packet_received(now, number, ack_eliciting, &space.dedup)
4830 {
4831 if self.abandoned_paths.contains(&path_id) {
4832 space.pending_acks.set_immediate_ack_required();
4835 } else {
4836 self.timers.set(
4837 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4838 now + self.ack_frequency.max_ack_delay,
4839 self.qlog.with_time(now),
4840 );
4841 }
4842 }
4843
4844 let pending = &mut self.spaces[SpaceId::Data].pending;
4849 self.streams.queue_max_stream_id(pending);
4850
4851 if let Some(reason) = close {
4852 self.state.move_to_draining(Some(reason.into()));
4853 self.close = true;
4854 }
4855
4856 if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4857 && !is_probing_packet
4858 && network_path != self.path_data(path_id).network_path
4859 {
4860 let ConnectionSide::Server { ref server_config } = self.side else {
4861 panic!("packets from unknown remote should be dropped by clients");
4862 };
4863 debug_assert!(
4864 server_config.migration,
4865 "migration-initiating packets should have been dropped immediately"
4866 );
4867 self.migrate(path_id, now, network_path, migration_observed_addr);
4868 self.update_rem_cid(path_id);
4870 self.spin = false;
4871 }
4872
4873 Ok(())
4874 }
4875
4876 fn migrate(
4877 &mut self,
4878 path_id: PathId,
4879 now: Instant,
4880 network_path: FourTuple,
4881 observed_addr: Option<ObservedAddr>,
4882 ) {
4883 trace!(%network_path, %path_id, "migration initiated");
4884 self.path_generation_counter = self.path_generation_counter.wrapping_add(1);
4885 let prev_pto = self.pto(SpaceId::Data, path_id);
4892 let known_path = self.paths.get_mut(&path_id).expect("known path");
4893 let path = &mut known_path.data;
4894 let mut new_path = if network_path.remote.is_ipv4()
4895 && network_path.remote.ip() == path.network_path.remote.ip()
4896 {
4897 PathData::from_previous(network_path, path, self.path_generation_counter, now)
4898 } else {
4899 let peer_max_udp_payload_size =
4900 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4901 .unwrap_or(u16::MAX);
4902 PathData::new(
4903 network_path,
4904 self.allow_mtud,
4905 Some(peer_max_udp_payload_size),
4906 self.path_generation_counter,
4907 now,
4908 &self.config,
4909 )
4910 };
4911 new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4912 if let Some(report) = observed_addr {
4913 if let Some(updated) = new_path.update_observed_addr_report(report) {
4914 tracing::info!("adding observed addr event from migration");
4915 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4916 id: path_id,
4917 addr: updated,
4918 }));
4919 }
4920 }
4921 new_path.send_new_challenge = true;
4922
4923 let mut prev = mem::replace(path, new_path);
4924 if !prev.is_validating_path() {
4926 prev.send_new_challenge = true;
4927 known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4931 }
4932
4933 self.qlog.emit_tuple_assigned(path_id, network_path, now);
4935
4936 self.timers.set(
4937 Timer::PerPath(path_id, PathTimer::PathValidation),
4938 now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4939 self.qlog.with_time(now),
4940 );
4941 }
4942
4943 pub fn local_address_changed(&mut self) {
4945 self.update_rem_cid(PathId::ZERO);
4947 self.ping();
4948 }
4949
4950 fn update_rem_cid(&mut self, path_id: PathId) {
4952 let Some((reset_token, retired)) =
4953 self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4954 else {
4955 return;
4956 };
4957
4958 self.spaces[SpaceId::Data]
4960 .pending
4961 .retire_cids
4962 .extend(retired.map(|seq| (path_id, seq)));
4963 let remote = self.path_data(path_id).network_path.remote;
4964 self.set_reset_token(path_id, remote, reset_token);
4965 }
4966
4967 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4976 self.endpoint_events
4977 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4978
4979 if path_id == PathId::ZERO {
4985 self.peer_params.stateless_reset_token = Some(reset_token);
4986 }
4987 }
4988
4989 fn issue_first_cids(&mut self, now: Instant) {
4991 if self
4992 .local_cid_state
4993 .get(&PathId::ZERO)
4994 .expect("PathId::ZERO exists when the connection is created")
4995 .cid_len()
4996 == 0
4997 {
4998 return;
4999 }
5000
5001 let mut n = self.peer_params.issue_cids_limit() - 1;
5003 if let ConnectionSide::Server { server_config } = &self.side {
5004 if server_config.has_preferred_address() {
5005 n -= 1;
5007 }
5008 }
5009 self.endpoint_events
5010 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5011 }
5012
5013 fn issue_first_path_cids(&mut self, now: Instant) {
5017 if let Some(max_path_id) = self.max_path_id() {
5018 let mut path_id = self.max_path_id_with_cids.next();
5019 while path_id <= max_path_id {
5020 self.endpoint_events
5021 .push_back(EndpointEventInner::NeedIdentifiers(
5022 path_id,
5023 now,
5024 self.peer_params.issue_cids_limit(),
5025 ));
5026 path_id = path_id.next();
5027 }
5028 self.max_path_id_with_cids = max_path_id;
5029 }
5030 }
5031
5032 fn populate_packet<'a, 'b>(
5040 &mut self,
5041 now: Instant,
5042 space_id: SpaceId,
5043 path_id: PathId,
5044 path_exclusive_only: bool,
5045 builder: &mut PacketBuilder<'a, 'b>,
5046 ) {
5047 let pn = builder.exact_number;
5048 let is_multipath_negotiated = self.is_multipath_negotiated();
5049 let stats = &mut self.stats.frame_tx;
5050 let space = &mut self.spaces[space_id];
5051 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5052 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5053 space
5054 .for_path(path_id)
5055 .pending_acks
5056 .maybe_ack_non_eliciting();
5057
5058 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5060 builder.write_frame(frame::HandshakeDone, stats);
5061 }
5062
5063 if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5066 while let Some(local_addr) = addresses.pop() {
5067 let reach_out = frame::ReachOut::new(*round, local_addr);
5068 if builder.frame_space_remaining() > reach_out.size() {
5069 builder.write_frame(reach_out, stats);
5070 } else {
5071 addresses.push(local_addr);
5072 break;
5073 }
5074 }
5075 if addresses.is_empty() {
5076 space.pending.reach_out = None;
5077 }
5078 }
5079
5080 if !path_exclusive_only
5082 && space_id == SpaceId::Data
5083 && self
5084 .config
5085 .address_discovery_role
5086 .should_report(&self.peer_params.address_discovery_role)
5087 && (!path.observed_addr_sent || space.pending.observed_addr)
5088 {
5089 let frame =
5090 frame::ObservedAddr::new(path.network_path.remote, self.next_observed_addr_seq_no);
5091 if builder.frame_space_remaining() > frame.size() {
5092 builder.write_frame(frame, stats);
5093
5094 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5095 path.observed_addr_sent = true;
5096
5097 space.pending.observed_addr = false;
5098 }
5099 }
5100
5101 if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5103 builder.write_frame(frame::Ping, stats);
5104 }
5105
5106 if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5108 debug_assert_eq!(
5109 space_id,
5110 SpaceId::Data,
5111 "immediate acks must be sent in the data space"
5112 );
5113 builder.write_frame(frame::ImmediateAck, stats);
5114 }
5115
5116 if !path_exclusive_only {
5120 for path_id in space
5121 .number_spaces
5122 .iter_mut()
5123 .filter(|(_, pns)| pns.pending_acks.can_send())
5124 .map(|(&path_id, _)| path_id)
5125 .collect::<Vec<_>>()
5126 {
5127 Self::populate_acks(
5128 now,
5129 self.receiving_ecn,
5130 path_id,
5131 space_id,
5132 space,
5133 is_multipath_negotiated,
5134 builder,
5135 stats,
5136 );
5137 }
5138 }
5139
5140 if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5142 let sequence_number = self.ack_frequency.next_sequence_number();
5143
5144 let config = self.config.ack_frequency_config.as_ref().unwrap();
5146
5147 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5149 path.rtt.get(),
5150 config,
5151 &self.peer_params,
5152 );
5153
5154 let frame = frame::AckFrequency {
5155 sequence: sequence_number,
5156 ack_eliciting_threshold: config.ack_eliciting_threshold,
5157 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5158 reordering_threshold: config.reordering_threshold,
5159 };
5160 builder.write_frame(frame, stats);
5161
5162 self.ack_frequency
5163 .ack_frequency_sent(path_id, pn, max_ack_delay);
5164 }
5165
5166 if builder.frame_space_remaining() > frame::PathChallenge::SIZE_BOUND
5168 && space_id == SpaceId::Data
5169 && path.send_new_challenge
5170 && !self.state.is_closed()
5171 {
5173 path.send_new_challenge = false;
5174
5175 let token = self.rng.random();
5177 let info = paths::SentChallengeInfo {
5178 sent_instant: now,
5179 network_path: path.network_path,
5180 };
5181 path.challenges_sent.insert(token, info);
5182 let challenge = frame::PathChallenge(token);
5183 trace!(frame = %challenge);
5184 builder.write_frame(challenge, stats);
5185 builder.require_padding();
5186 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5187 self.timers.set(
5188 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5189 now + pto,
5190 self.qlog.with_time(now),
5191 );
5192
5193 if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5194 space.pending.path_status.insert(path_id);
5196 }
5197
5198 if space_id == SpaceId::Data
5201 && self
5202 .config
5203 .address_discovery_role
5204 .should_report(&self.peer_params.address_discovery_role)
5205 {
5206 let frame = frame::ObservedAddr::new(
5207 path.network_path.remote,
5208 self.next_observed_addr_seq_no,
5209 );
5210 if builder.frame_space_remaining() > frame.size() {
5211 builder.write_frame(frame, stats);
5212
5213 self.next_observed_addr_seq_no =
5214 self.next_observed_addr_seq_no.saturating_add(1u8);
5215 path.observed_addr_sent = true;
5216
5217 space.pending.observed_addr = false;
5218 }
5219 }
5220 }
5221
5222 if builder.frame_space_remaining() > frame::PathResponse::SIZE_BOUND
5224 && space_id == SpaceId::Data
5225 {
5226 if let Some(token) = path.path_responses.pop_on_path(path.network_path) {
5227 let response = frame::PathResponse(token);
5228 trace!(frame = %response);
5229 builder.write_frame(response, stats);
5230 builder.require_padding();
5231
5232 if space_id == SpaceId::Data
5236 && self
5237 .config
5238 .address_discovery_role
5239 .should_report(&self.peer_params.address_discovery_role)
5240 {
5241 let frame = frame::ObservedAddr::new(
5242 path.network_path.remote,
5243 self.next_observed_addr_seq_no,
5244 );
5245 if builder.frame_space_remaining() > frame.size() {
5246 builder.write_frame(frame, stats);
5247
5248 self.next_observed_addr_seq_no =
5249 self.next_observed_addr_seq_no.saturating_add(1u8);
5250 path.observed_addr_sent = true;
5251
5252 space.pending.observed_addr = false;
5253 }
5254 }
5255 }
5256 }
5257
5258 while !path_exclusive_only
5260 && builder.frame_space_remaining() > frame::Crypto::SIZE_BOUND
5261 && !is_0rtt
5262 {
5263 let mut frame = match space.pending.crypto.pop_front() {
5264 Some(x) => x,
5265 None => break,
5266 };
5267
5268 let max_crypto_data_size = builder.frame_space_remaining()
5273 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5275 - 2; let len = frame
5278 .data
5279 .len()
5280 .min(2usize.pow(14) - 1)
5281 .min(max_crypto_data_size);
5282
5283 let data = frame.data.split_to(len);
5284 let offset = frame.offset;
5285 let truncated = frame::Crypto { offset, data };
5286 builder.write_frame(truncated, stats);
5287
5288 if !frame.data.is_empty() {
5289 frame.offset += len as u64;
5290 space.pending.crypto.push_front(frame);
5291 }
5292 }
5293
5294 while !path_exclusive_only
5297 && space_id == SpaceId::Data
5298 && frame::PathAbandon::SIZE_BOUND <= builder.frame_space_remaining()
5299 {
5300 let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5301 break;
5302 };
5303 let frame = frame::PathAbandon {
5304 path_id,
5305 error_code,
5306 };
5307 builder.write_frame(frame, stats);
5308 }
5309
5310 while !path_exclusive_only
5312 && space_id == SpaceId::Data
5313 && frame::PathStatusAvailable::SIZE_BOUND <= builder.frame_space_remaining()
5314 {
5315 let Some(path_id) = space.pending.path_status.pop_first() else {
5316 break;
5317 };
5318 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5319 trace!(%path_id, "discarding queued path status for unknown path");
5320 continue;
5321 };
5322
5323 let seq = path.status.seq();
5324 match path.local_status() {
5325 PathStatus::Available => {
5326 let frame = frame::PathStatusAvailable {
5327 path_id,
5328 status_seq_no: seq,
5329 };
5330 builder.write_frame(frame, stats);
5331 }
5332 PathStatus::Backup => {
5333 let frame = frame::PathStatusBackup {
5334 path_id,
5335 status_seq_no: seq,
5336 };
5337 builder.write_frame(frame, stats);
5338 }
5339 }
5340 }
5341
5342 if space_id == SpaceId::Data
5344 && space.pending.max_path_id
5345 && frame::MaxPathId::SIZE_BOUND <= builder.frame_space_remaining()
5346 {
5347 let frame = frame::MaxPathId(self.local_max_path_id);
5348 builder.write_frame(frame, stats);
5349 space.pending.max_path_id = false;
5350 }
5351
5352 if space_id == SpaceId::Data
5354 && space.pending.paths_blocked
5355 && frame::PathsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5356 {
5357 let frame = frame::PathsBlocked(self.remote_max_path_id);
5358 builder.write_frame(frame, stats);
5359 space.pending.paths_blocked = false;
5360 }
5361
5362 while space_id == SpaceId::Data
5364 && frame::PathCidsBlocked::SIZE_BOUND <= builder.frame_space_remaining()
5365 {
5366 let Some(path_id) = space.pending.path_cids_blocked.pop_first() else {
5367 break;
5368 };
5369 let next_seq = match self.rem_cids.get(&path_id) {
5370 Some(cid_queue) => VarInt(cid_queue.active_seq() + 1),
5371 None => VarInt(0),
5372 };
5373 let frame = frame::PathCidsBlocked { path_id, next_seq };
5374 builder.write_frame(frame, stats);
5375 }
5376
5377 if space_id == SpaceId::Data {
5379 self.streams
5380 .write_control_frames(builder, &mut space.pending, stats);
5381 }
5382
5383 let cid_len = self
5385 .local_cid_state
5386 .values()
5387 .map(|cid_state| cid_state.cid_len())
5388 .max()
5389 .expect("some local CID state must exist");
5390 let new_cid_size_bound =
5391 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5392 while !path_exclusive_only && builder.frame_space_remaining() > new_cid_size_bound {
5393 let issued = match space.pending.new_cids.pop() {
5394 Some(x) => x,
5395 None => break,
5396 };
5397 let retire_prior_to = self
5398 .local_cid_state
5399 .get(&issued.path_id)
5400 .map(|cid_state| cid_state.retire_prior_to())
5401 .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5402
5403 let cid_path_id = match is_multipath_negotiated {
5404 true => Some(issued.path_id),
5405 false => {
5406 debug_assert_eq!(issued.path_id, PathId::ZERO);
5407 None
5408 }
5409 };
5410 let frame = frame::NewConnectionId {
5411 path_id: cid_path_id,
5412 sequence: issued.sequence,
5413 retire_prior_to,
5414 id: issued.id,
5415 reset_token: issued.reset_token,
5416 };
5417 builder.write_frame(frame, stats);
5418 }
5419
5420 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5422 while !path_exclusive_only && builder.frame_space_remaining() > retire_cid_bound {
5423 let (path_id, sequence) = match space.pending.retire_cids.pop() {
5424 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => (None, seq),
5425 Some((path_id, seq)) => (Some(path_id), seq),
5426 None => break,
5427 };
5428 let frame = frame::RetireConnectionId { path_id, sequence };
5429 builder.write_frame(frame, stats);
5430 }
5431
5432 let mut sent_datagrams = false;
5434 while !path_exclusive_only
5435 && builder.frame_space_remaining() > Datagram::SIZE_BOUND
5436 && space_id == SpaceId::Data
5437 {
5438 match self.datagrams.write(builder, stats) {
5439 true => {
5440 sent_datagrams = true;
5441 }
5442 false => break,
5443 }
5444 }
5445 if self.datagrams.send_blocked && sent_datagrams {
5446 self.events.push_back(Event::DatagramsUnblocked);
5447 self.datagrams.send_blocked = false;
5448 }
5449
5450 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5451
5452 while let Some(network_path) = space.pending.new_tokens.pop() {
5454 if path_exclusive_only {
5455 break;
5456 }
5457 debug_assert_eq!(space_id, SpaceId::Data);
5458 let ConnectionSide::Server { server_config } = &self.side else {
5459 panic!("NEW_TOKEN frames should not be enqueued by clients");
5460 };
5461
5462 if !network_path.is_probably_same_path(&path.network_path) {
5463 continue;
5468 }
5469
5470 let token = Token::new(
5471 TokenPayload::Validation {
5472 ip: network_path.remote.ip(),
5473 issued: server_config.time_source.now(),
5474 },
5475 &mut self.rng,
5476 );
5477 let new_token = NewToken {
5478 token: token.encode(&*server_config.token_key).into(),
5479 };
5480
5481 if builder.frame_space_remaining() < new_token.size() {
5482 space.pending.new_tokens.push(network_path);
5483 break;
5484 }
5485
5486 builder.write_frame(new_token, stats);
5487 builder.retransmits_mut().new_tokens.push(network_path);
5488 }
5489
5490 if !path_exclusive_only && space_id == SpaceId::Data {
5492 self.streams
5493 .write_stream_frames(builder, self.config.send_fairness, stats);
5494 }
5495
5496 while space_id == SpaceId::Data
5499 && frame::AddAddress::SIZE_BOUND <= builder.frame_space_remaining()
5500 {
5501 if let Some(added_address) = space.pending.add_address.pop_last() {
5502 builder.write_frame(added_address, stats);
5503 } else {
5504 break;
5505 }
5506 }
5507
5508 while space_id == SpaceId::Data
5510 && frame::RemoveAddress::SIZE_BOUND <= builder.frame_space_remaining()
5511 {
5512 if let Some(removed_address) = space.pending.remove_address.pop_last() {
5513 builder.write_frame(removed_address, stats);
5514 } else {
5515 break;
5516 }
5517 }
5518 }
5519
5520 fn populate_acks<'a, 'b>(
5522 now: Instant,
5523 receiving_ecn: bool,
5524 path_id: PathId,
5525 space_id: SpaceId,
5526 space: &mut PacketSpace,
5527 is_multipath_negotiated: bool,
5528 builder: &mut PacketBuilder<'a, 'b>,
5529 stats: &mut FrameStats,
5530 ) {
5531 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5533
5534 debug_assert!(
5535 is_multipath_negotiated || path_id == PathId::ZERO,
5536 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5537 );
5538 if is_multipath_negotiated {
5539 debug_assert!(
5540 space_id == SpaceId::Data || path_id == PathId::ZERO,
5541 "path acks must be sent in 1RTT space (have {space_id:?})"
5542 );
5543 }
5544
5545 let pns = space.for_path(path_id);
5546 let ranges = pns.pending_acks.ranges();
5547 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5548 let ecn = if receiving_ecn {
5549 Some(&pns.ecn_counters)
5550 } else {
5551 None
5552 };
5553
5554 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5555 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5557 let delay = delay_micros >> ack_delay_exp.into_inner();
5558
5559 if is_multipath_negotiated && space_id == SpaceId::Data {
5560 if !ranges.is_empty() {
5561 let frame = frame::PathAck::encoder(path_id, delay, ranges, ecn);
5562 builder.write_frame(frame, stats);
5563 }
5564 } else {
5565 builder.write_frame(frame::Ack::encoder(delay, ranges, ecn), stats);
5566 }
5567 }
5568
5569 fn close_common(&mut self) {
5570 trace!("connection closed");
5571 self.timers.reset();
5572 }
5573
5574 fn set_close_timer(&mut self, now: Instant) {
5575 let pto_max = self.pto_max_path(self.highest_space, true);
5578 self.timers.set(
5579 Timer::Conn(ConnTimer::Close),
5580 now + 3 * pto_max,
5581 self.qlog.with_time(now),
5582 );
5583 }
5584
5585 fn handle_peer_params(
5590 &mut self,
5591 params: TransportParameters,
5592 loc_cid: ConnectionId,
5593 rem_cid: ConnectionId,
5594 now: Instant,
5595 ) -> Result<(), TransportError> {
5596 if Some(self.orig_rem_cid) != params.initial_src_cid
5597 || (self.side.is_client()
5598 && (Some(self.initial_dst_cid) != params.original_dst_cid
5599 || self.retry_src_cid != params.retry_src_cid))
5600 {
5601 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5602 "CID authentication failure",
5603 ));
5604 }
5605 if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5606 return Err(TransportError::PROTOCOL_VIOLATION(
5607 "multipath must not use zero-length CIDs",
5608 ));
5609 }
5610
5611 self.set_peer_params(params);
5612 self.qlog.emit_peer_transport_params_received(self, now);
5613
5614 Ok(())
5615 }
5616
5617 fn set_peer_params(&mut self, params: TransportParameters) {
5618 self.streams.set_params(¶ms);
5619 self.idle_timeout =
5620 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5621 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5622
5623 if let Some(ref info) = params.preferred_address {
5624 self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5626 path_id: None,
5627 sequence: 1,
5628 id: info.connection_id,
5629 reset_token: info.stateless_reset_token,
5630 retire_prior_to: 0,
5631 })
5632 .expect(
5633 "preferred address CID is the first received, and hence is guaranteed to be legal",
5634 );
5635 let remote = self.path_data(PathId::ZERO).network_path.remote;
5636 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5637 }
5638 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
5639
5640 let mut multipath_enabled = None;
5641 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5642 self.config.get_initial_max_path_id(),
5643 params.initial_max_path_id,
5644 ) {
5645 self.local_max_path_id = local_max_path_id;
5647 self.remote_max_path_id = remote_max_path_id;
5648 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5649 debug!(%initial_max_path_id, "multipath negotiated");
5650 multipath_enabled = Some(initial_max_path_id);
5651 }
5652
5653 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5654 self.config
5655 .max_remote_nat_traversal_addresses
5656 .zip(params.max_remote_nat_traversal_addresses)
5657 {
5658 if let Some(max_initial_paths) =
5659 multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5660 {
5661 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5662 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5663 self.iroh_hp =
5664 iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5665 debug!(
5666 %max_remote_addresses, %max_local_addresses,
5667 "iroh hole punching negotiated"
5668 );
5669
5670 match self.side() {
5671 Side::Client => {
5672 if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5673 warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5676 } else if max_local_addresses as u64
5677 > params.active_connection_id_limit.into_inner()
5678 {
5679 warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5683 }
5684 }
5685 Side::Server => {
5686 if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5687 warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5688 }
5689 }
5690 }
5691 } else {
5692 debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5693 }
5694 }
5695
5696 self.peer_params = params;
5697 let peer_max_udp_payload_size =
5698 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5699 self.path_data_mut(PathId::ZERO)
5700 .mtud
5701 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5702 }
5703
5704 fn decrypt_packet(
5706 &mut self,
5707 now: Instant,
5708 path_id: PathId,
5709 packet: &mut Packet,
5710 ) -> Result<Option<u64>, Option<TransportError>> {
5711 let result = packet_crypto::decrypt_packet_body(
5712 packet,
5713 path_id,
5714 &self.spaces,
5715 self.zero_rtt_crypto.as_ref(),
5716 self.key_phase,
5717 self.prev_crypto.as_ref(),
5718 self.next_crypto.as_ref(),
5719 )?;
5720
5721 let result = match result {
5722 Some(r) => r,
5723 None => return Ok(None),
5724 };
5725
5726 if result.outgoing_key_update_acked {
5727 if let Some(prev) = self.prev_crypto.as_mut() {
5728 prev.end_packet = Some((result.number, now));
5729 self.set_key_discard_timer(now, packet.header.space());
5730 }
5731 }
5732
5733 if result.incoming_key_update {
5734 trace!("key update authenticated");
5735 self.update_keys(Some((result.number, now)), true);
5736 self.set_key_discard_timer(now, packet.header.space());
5737 }
5738
5739 Ok(Some(result.number))
5740 }
5741
5742 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5743 trace!("executing key update");
5744 let new = self
5748 .crypto
5749 .next_1rtt_keys()
5750 .expect("only called for `Data` packets");
5751 self.key_phase_size = new
5752 .local
5753 .confidentiality_limit()
5754 .saturating_sub(KEY_UPDATE_MARGIN);
5755 let old = mem::replace(
5756 &mut self.spaces[SpaceId::Data]
5757 .crypto
5758 .as_mut()
5759 .unwrap() .packet,
5761 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5762 );
5763 self.spaces[SpaceId::Data]
5764 .iter_paths_mut()
5765 .for_each(|s| s.sent_with_keys = 0);
5766 self.prev_crypto = Some(PrevCrypto {
5767 crypto: old,
5768 end_packet,
5769 update_unacked: remote,
5770 });
5771 self.key_phase = !self.key_phase;
5772 }
5773
5774 fn peer_supports_ack_frequency(&self) -> bool {
5775 self.peer_params.min_ack_delay.is_some()
5776 }
5777
5778 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5783 debug_assert_eq!(
5784 self.highest_space,
5785 SpaceId::Data,
5786 "immediate ack must be written in the data space"
5787 );
5788 self.spaces[self.highest_space]
5789 .for_path(path_id)
5790 .immediate_ack_pending = true;
5791 }
5792
5793 #[cfg(test)]
5795 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5796 let (path_id, first_decode, remaining) = match &event.0 {
5797 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5798 path_id,
5799 first_decode,
5800 remaining,
5801 ..
5802 }) => (path_id, first_decode, remaining),
5803 _ => return None,
5804 };
5805
5806 if remaining.is_some() {
5807 panic!("Packets should never be coalesced in tests");
5808 }
5809
5810 let decrypted_header = packet_crypto::unprotect_header(
5811 first_decode.clone(),
5812 &self.spaces,
5813 self.zero_rtt_crypto.as_ref(),
5814 self.peer_params.stateless_reset_token,
5815 )?;
5816
5817 let mut packet = decrypted_header.packet?;
5818 packet_crypto::decrypt_packet_body(
5819 &mut packet,
5820 *path_id,
5821 &self.spaces,
5822 self.zero_rtt_crypto.as_ref(),
5823 self.key_phase,
5824 self.prev_crypto.as_ref(),
5825 self.next_crypto.as_ref(),
5826 )
5827 .ok()?;
5828
5829 Some(packet.payload.to_vec())
5830 }
5831
5832 #[cfg(test)]
5835 pub(crate) fn bytes_in_flight(&self) -> u64 {
5836 self.path_data(PathId::ZERO).in_flight.bytes
5838 }
5839
5840 #[cfg(test)]
5842 pub(crate) fn congestion_window(&self) -> u64 {
5843 let path = self.path_data(PathId::ZERO);
5844 path.congestion
5845 .window()
5846 .saturating_sub(path.in_flight.bytes)
5847 }
5848
5849 #[cfg(test)]
5851 pub(crate) fn is_idle(&self) -> bool {
5852 let current_timers = self.timers.values();
5853 current_timers
5854 .into_iter()
5855 .filter(|(timer, _)| {
5856 !matches!(
5857 timer,
5858 Timer::Conn(ConnTimer::KeepAlive)
5859 | Timer::PerPath(_, PathTimer::PathKeepAlive)
5860 | Timer::Conn(ConnTimer::PushNewCid)
5861 | Timer::Conn(ConnTimer::KeyDiscard)
5862 )
5863 })
5864 .min_by_key(|(_, time)| *time)
5865 .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
5866 }
5867
5868 #[cfg(test)]
5870 pub(crate) fn using_ecn(&self) -> bool {
5871 self.path_data(PathId::ZERO).sending_ecn
5872 }
5873
5874 #[cfg(test)]
5876 pub(crate) fn total_recvd(&self) -> u64 {
5877 self.path_data(PathId::ZERO).total_recvd
5878 }
5879
5880 #[cfg(test)]
5881 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
5882 self.local_cid_state
5883 .get(&PathId::ZERO)
5884 .unwrap()
5885 .active_seq()
5886 }
5887
5888 #[cfg(test)]
5889 #[track_caller]
5890 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
5891 self.local_cid_state
5892 .get(&PathId(path_id))
5893 .unwrap()
5894 .active_seq()
5895 }
5896
5897 #[cfg(test)]
5900 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
5901 let n = self
5902 .local_cid_state
5903 .get_mut(&PathId::ZERO)
5904 .unwrap()
5905 .assign_retire_seq(v);
5906 self.endpoint_events
5907 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5908 }
5909
5910 #[cfg(test)]
5912 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
5913 self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
5914 }
5915
5916 #[cfg(test)]
5918 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
5919 self.path_data(path_id).current_mtu()
5920 }
5921
5922 #[cfg(test)]
5924 pub(crate) fn trigger_path_validation(&mut self) {
5925 for path in self.paths.values_mut() {
5926 path.data.send_new_challenge = true;
5927 }
5928 }
5929
5930 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
5941 let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
5942 path.data.send_new_challenge
5943 || path
5944 .prev
5945 .as_ref()
5946 .is_some_and(|(_, path)| path.send_new_challenge)
5947 || !path.data.path_responses.is_empty()
5948 });
5949 let other = self.streams.can_send_stream_data()
5950 || self
5951 .datagrams
5952 .outgoing
5953 .front()
5954 .is_some_and(|x| x.size(true) <= max_size);
5955 SendableFrames {
5956 acks: false,
5957 other,
5958 close: false,
5959 path_exclusive,
5960 }
5961 }
5962
5963 fn kill(&mut self, reason: ConnectionError) {
5965 self.close_common();
5966 self.state.move_to_drained(Some(reason));
5967 self.endpoint_events.push_back(EndpointEventInner::Drained);
5968 }
5969
5970 pub fn current_mtu(&self) -> u16 {
5977 self.paths
5978 .iter()
5979 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
5980 .map(|(_path_id, path_state)| path_state.data.current_mtu())
5981 .min()
5982 .expect("There is always at least one available path")
5983 }
5984
5985 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
5992 let pn_len = PacketNumber::new(
5993 pn,
5994 self.spaces[SpaceId::Data]
5995 .for_path(path)
5996 .largest_acked_packet
5997 .unwrap_or(0),
5998 )
5999 .len();
6000
6001 1 + self
6003 .rem_cids
6004 .get(&path)
6005 .map(|cids| cids.active().len())
6006 .unwrap_or(20) + pn_len
6008 + self.tag_len_1rtt()
6009 }
6010
6011 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6012 let pn_len = 4;
6013
6014 let cid_len = self
6015 .rem_cids
6016 .values()
6017 .map(|cids| cids.active().len())
6018 .max()
6019 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
6023 }
6024
6025 fn tag_len_1rtt(&self) -> usize {
6026 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6027 Some(crypto) => Some(&*crypto.packet.local),
6028 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6029 };
6030 key.map_or(16, |x| x.tag_len())
6034 }
6035
6036 fn on_path_validated(&mut self, path_id: PathId) {
6038 self.path_data_mut(path_id).validated = true;
6039 let ConnectionSide::Server { server_config } = &self.side else {
6040 return;
6041 };
6042 let network_path = self.path_data(path_id).network_path;
6043 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6044 new_tokens.clear();
6045 for _ in 0..server_config.validation_token.sent {
6046 new_tokens.push(network_path);
6047 }
6048 }
6049
6050 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6052 if let Some(path) = self.paths.get_mut(&path_id) {
6053 path.data.status.remote_update(status, status_seq_no);
6054 } else {
6055 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6056 }
6057 self.events.push_back(
6058 PathEvent::RemoteStatus {
6059 id: path_id,
6060 status,
6061 }
6062 .into(),
6063 );
6064 }
6065
6066 fn max_path_id(&self) -> Option<PathId> {
6075 if self.is_multipath_negotiated() {
6076 Some(self.remote_max_path_id.min(self.local_max_path_id))
6077 } else {
6078 None
6079 }
6080 }
6081
6082 pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6084 if let Some(added) = self.iroh_hp.add_local_address(address)? {
6085 self.spaces[SpaceId::Data].pending.add_address.insert(added);
6086 };
6087 Ok(())
6088 }
6089
6090 pub fn remove_nat_traversal_address(
6094 &mut self,
6095 address: SocketAddr,
6096 ) -> Result<(), iroh_hp::Error> {
6097 if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6098 self.spaces[SpaceId::Data]
6099 .pending
6100 .remove_address
6101 .insert(removed);
6102 }
6103 Ok(())
6104 }
6105
6106 pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6108 self.iroh_hp.get_local_nat_traversal_addresses()
6109 }
6110
6111 pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6113 Ok(self
6114 .iroh_hp
6115 .client_side()?
6116 .get_remote_nat_traversal_addresses())
6117 }
6118
6119 fn open_nat_traversal_path(
6127 &mut self,
6128 now: Instant,
6129 (ip, port): (IpAddr, u16),
6130 ipv6: bool,
6131 ) -> Result<Option<(PathId, SocketAddr, bool)>, PathError> {
6132 let remote = match ip {
6134 IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6135 IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6136 IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6137 IpAddr::V6(_) => {
6138 trace!("not using IPv6 nat candidate for IPv4 socket");
6139 return Ok(None);
6140 }
6141 };
6142 let network_path = FourTuple {
6147 remote,
6148 local_ip: None,
6149 };
6150 match self.open_path_ensure(network_path, PathStatus::Backup, now) {
6151 Ok((path_id, path_was_known)) => {
6152 if path_was_known {
6153 trace!(%path_id, %remote, "nat traversal: path existed for remote");
6154 }
6155 Ok(Some((path_id, remote, path_was_known)))
6156 }
6157 Err(e) => {
6158 debug!(%remote, %e, "nat traversal: failed to probe remote");
6159 Err(e)
6160 }
6161 }
6162 }
6163
6164 pub fn initiate_nat_traversal_round(
6174 &mut self,
6175 now: Instant,
6176 ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6177 if self.state.is_closed() {
6178 return Err(iroh_hp::Error::Closed);
6179 }
6180
6181 let client_state = self.iroh_hp.client_side_mut()?;
6182 let iroh_hp::NatTraversalRound {
6183 new_round,
6184 reach_out_at,
6185 addresses_to_probe,
6186 prev_round_path_ids,
6187 } = client_state.initiate_nat_traversal_round()?;
6188
6189 self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6190
6191 for path_id in prev_round_path_ids {
6192 let validated = self
6195 .path(path_id)
6196 .map(|path| path.validated)
6197 .unwrap_or(false);
6198
6199 if !validated {
6200 let _ = self.close_path(
6201 now,
6202 path_id,
6203 TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6204 );
6205 }
6206 }
6207
6208 let mut err = None;
6209
6210 let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6211 let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6212 let ipv6 = self
6213 .paths
6214 .values()
6215 .any(|p| p.data.network_path.remote.is_ipv6());
6216
6217 for (id, address) in addresses_to_probe {
6218 match self.open_nat_traversal_path(now, address, ipv6) {
6219 Ok(None) => {}
6220 Ok(Some((path_id, remote, path_was_known))) => {
6221 if !path_was_known {
6222 path_ids.push(path_id);
6223 probed_addresses.push(remote);
6224 }
6225 }
6226 Err(e) => {
6227 self.iroh_hp
6228 .client_side_mut()
6229 .expect("validated")
6230 .report_in_continuation(id, e);
6231 err.get_or_insert(e);
6232 }
6233 }
6234 }
6235
6236 if let Some(err) = err {
6237 if probed_addresses.is_empty() {
6239 return Err(iroh_hp::Error::Multipath(err));
6240 }
6241 }
6242
6243 self.iroh_hp
6244 .client_side_mut()
6245 .expect("connection side validated")
6246 .set_round_path_ids(path_ids);
6247
6248 Ok(probed_addresses)
6249 }
6250
6251 fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6256 let client_state = self.iroh_hp.client_side_mut().ok()?;
6257 let (id, address) = client_state.continue_nat_traversal_round()?;
6258 let ipv6 = self
6259 .paths
6260 .values()
6261 .any(|p| p.data.network_path.remote.is_ipv6());
6262 let open_result = self.open_nat_traversal_path(now, address, ipv6);
6263 let client_state = self.iroh_hp.client_side_mut().expect("validated");
6264 match open_result {
6265 Ok(None) => Some(true),
6266 Ok(Some((path_id, _remote, path_was_known))) => {
6267 if !path_was_known {
6268 client_state.add_round_path_id(path_id);
6269 }
6270 Some(true)
6271 }
6272 Err(e) => {
6273 client_state.report_in_continuation(id, e);
6274 Some(false)
6275 }
6276 }
6277 }
6278}
6279
6280impl fmt::Debug for Connection {
6281 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
6282 f.debug_struct("Connection")
6283 .field("handshake_cid", &self.handshake_cid)
6284 .finish()
6285 }
6286}
6287
6288#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6289enum PathBlocked {
6290 No,
6291 AntiAmplification,
6292 Congestion,
6293 Pacing,
6294}
6295
6296enum ConnectionSide {
6298 Client {
6299 token: Bytes,
6301 token_store: Arc<dyn TokenStore>,
6302 server_name: String,
6303 },
6304 Server {
6305 server_config: Arc<ServerConfig>,
6306 },
6307}
6308
6309impl ConnectionSide {
6310 fn remote_may_migrate(&self, state: &State) -> bool {
6311 match self {
6312 Self::Server { server_config } => server_config.migration,
6313 Self::Client { .. } => {
6314 if let Some(hs) = state.as_handshake() {
6315 hs.allow_server_migration
6316 } else {
6317 false
6318 }
6319 }
6320 }
6321 }
6322
6323 fn is_client(&self) -> bool {
6324 self.side().is_client()
6325 }
6326
6327 fn is_server(&self) -> bool {
6328 self.side().is_server()
6329 }
6330
6331 fn side(&self) -> Side {
6332 match *self {
6333 Self::Client { .. } => Side::Client,
6334 Self::Server { .. } => Side::Server,
6335 }
6336 }
6337}
6338
6339impl From<SideArgs> for ConnectionSide {
6340 fn from(side: SideArgs) -> Self {
6341 match side {
6342 SideArgs::Client {
6343 token_store,
6344 server_name,
6345 } => Self::Client {
6346 token: token_store.take(&server_name).unwrap_or_default(),
6347 token_store,
6348 server_name,
6349 },
6350 SideArgs::Server {
6351 server_config,
6352 pref_addr_cid: _,
6353 path_validated: _,
6354 } => Self::Server { server_config },
6355 }
6356 }
6357}
6358
6359pub(crate) enum SideArgs {
6361 Client {
6362 token_store: Arc<dyn TokenStore>,
6363 server_name: String,
6364 },
6365 Server {
6366 server_config: Arc<ServerConfig>,
6367 pref_addr_cid: Option<ConnectionId>,
6368 path_validated: bool,
6369 },
6370}
6371
6372impl SideArgs {
6373 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6374 match *self {
6375 Self::Client { .. } => None,
6376 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6377 }
6378 }
6379
6380 pub(crate) fn path_validated(&self) -> bool {
6381 match *self {
6382 Self::Client { .. } => true,
6383 Self::Server { path_validated, .. } => path_validated,
6384 }
6385 }
6386
6387 pub(crate) fn side(&self) -> Side {
6388 match *self {
6389 Self::Client { .. } => Side::Client,
6390 Self::Server { .. } => Side::Server,
6391 }
6392 }
6393}
6394
6395#[derive(Debug, Error, Clone, PartialEq, Eq)]
6397pub enum ConnectionError {
6398 #[error("peer doesn't implement any supported version")]
6400 VersionMismatch,
6401 #[error(transparent)]
6403 TransportError(#[from] TransportError),
6404 #[error("aborted by peer: {0}")]
6406 ConnectionClosed(frame::ConnectionClose),
6407 #[error("closed by peer: {0}")]
6409 ApplicationClosed(frame::ApplicationClose),
6410 #[error("reset by peer")]
6412 Reset,
6413 #[error("timed out")]
6419 TimedOut,
6420 #[error("closed")]
6422 LocallyClosed,
6423 #[error("CIDs exhausted")]
6427 CidsExhausted,
6428}
6429
6430impl From<Close> for ConnectionError {
6431 fn from(x: Close) -> Self {
6432 match x {
6433 Close::Connection(reason) => Self::ConnectionClosed(reason),
6434 Close::Application(reason) => Self::ApplicationClosed(reason),
6435 }
6436 }
6437}
6438
6439impl From<ConnectionError> for io::Error {
6441 fn from(x: ConnectionError) -> Self {
6442 use ConnectionError::*;
6443 let kind = match x {
6444 TimedOut => io::ErrorKind::TimedOut,
6445 Reset => io::ErrorKind::ConnectionReset,
6446 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6447 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6448 io::ErrorKind::Other
6449 }
6450 };
6451 Self::new(kind, x)
6452 }
6453}
6454
6455#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6458pub enum PathError {
6459 #[error("multipath extension not negotiated")]
6461 MultipathNotNegotiated,
6462 #[error("the server side may not open a path")]
6464 ServerSideNotAllowed,
6465 #[error("maximum number of concurrent paths reached")]
6467 MaxPathIdReached,
6468 #[error("remoted CIDs exhausted")]
6470 RemoteCidsExhausted,
6471 #[error("path validation failed")]
6473 ValidationFailed,
6474 #[error("invalid remote address")]
6476 InvalidRemoteAddress(SocketAddr),
6477}
6478
6479#[derive(Debug, Error, Clone, Eq, PartialEq)]
6481pub enum ClosePathError {
6482 #[error("closed path")]
6484 ClosedPath,
6485 #[error("last open path")]
6487 LastOpenPath,
6488}
6489
6490#[derive(Debug, Error, Clone, Copy)]
6492#[error("Multipath extension not negotiated")]
6493pub struct MultipathNotNegotiated {
6494 _private: (),
6495}
6496
6497#[derive(Debug)]
6499pub enum Event {
6500 HandshakeDataReady,
6502 Connected,
6504 HandshakeConfirmed,
6506 ConnectionLost {
6510 reason: ConnectionError,
6512 },
6513 Stream(StreamEvent),
6515 DatagramReceived,
6517 DatagramsUnblocked,
6519 Path(PathEvent),
6521 NatTraversal(iroh_hp::Event),
6523}
6524
6525impl From<PathEvent> for Event {
6526 fn from(source: PathEvent) -> Self {
6527 Self::Path(source)
6528 }
6529}
6530
6531fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6532 Duration::from_micros(params.max_ack_delay.0 * 1000)
6533}
6534
6535const MAX_BACKOFF_EXPONENT: u32 = 16;
6537
6538const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6546
6547const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6553 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6554
6555const KEY_UPDATE_MARGIN: u64 = 10_000;
6559
6560#[derive(Default)]
6561struct SentFrames {
6562 retransmits: ThinRetransmits,
6563 largest_acked: FxHashMap<PathId, u64>,
6565 stream_frames: StreamMetaVec,
6566 non_retransmits: bool,
6568 requires_padding: bool,
6570}
6571
6572impl SentFrames {
6573 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6575 !self.largest_acked.is_empty()
6576 && !self.non_retransmits
6577 && self.stream_frames.is_empty()
6578 && self.retransmits.is_empty(streams)
6579 }
6580
6581 fn retransmits_mut(&mut self) -> &mut Retransmits {
6582 self.retransmits.get_or_create()
6583 }
6584
6585 fn record_sent_frame(&mut self, frame: frame::EncodableFrame<'_>) {
6586 use frame::EncodableFrame::*;
6587 match frame {
6588 PathAck(path_ack_encoder) => {
6589 if let Some(max) = path_ack_encoder.ranges.max() {
6590 self.largest_acked.insert(path_ack_encoder.path_id, max);
6591 }
6592 }
6593 Ack(ack_encoder) => {
6594 if let Some(max) = ack_encoder.ranges.max() {
6595 self.largest_acked.insert(PathId::ZERO, max);
6596 }
6597 }
6598 Close(_) => { }
6599 PathResponse(_) => self.non_retransmits = true,
6600 HandshakeDone(_) => self.retransmits_mut().handshake_done = true,
6601 ReachOut(frame::ReachOut { round, ip, port }) => self
6602 .retransmits_mut()
6603 .reach_out
6604 .get_or_insert_with(|| (round, Vec::new()))
6605 .1
6606 .push((ip, port)),
6607 ObservedAddr(_) => self.retransmits_mut().observed_addr = true,
6608 Ping(_) => self.non_retransmits = true,
6609 ImmediateAck(_) => self.non_retransmits = true,
6610 AckFrequency(_) => self.retransmits_mut().ack_frequency = true,
6611 PathChallenge(_) => self.non_retransmits = true,
6612 Crypto(crypto) => self.retransmits_mut().crypto.push_back(crypto),
6613 PathAbandon(path_abandon) => {
6614 self.retransmits_mut()
6615 .path_abandon
6616 .entry(path_abandon.path_id)
6617 .or_insert(path_abandon.error_code);
6618 }
6619 PathStatusAvailable(frame::PathStatusAvailable { path_id, .. })
6620 | PathStatusBackup(frame::PathStatusBackup { path_id, .. }) => {
6621 self.retransmits_mut().path_status.insert(path_id);
6622 }
6623 MaxPathId(_) => self.retransmits_mut().max_path_id = true,
6624 PathsBlocked(_) => self.retransmits_mut().paths_blocked = true,
6625 PathCidsBlocked(path_cids_blocked) => {
6626 self.retransmits_mut()
6627 .path_cids_blocked
6628 .insert(path_cids_blocked.path_id);
6629 }
6630 ResetStream(reset) => self
6631 .retransmits_mut()
6632 .reset_stream
6633 .push((reset.id, reset.error_code)),
6634 StopSending(stop_sending) => self.retransmits_mut().stop_sending.push(stop_sending),
6635 NewConnectionId(new_cid) => self.retransmits_mut().new_cids.push(new_cid.issued()),
6636 RetireConnectionId(retire_cid) => self
6637 .retransmits_mut()
6638 .retire_cids
6639 .push((retire_cid.path_id.unwrap_or_default(), retire_cid.sequence)),
6640 Datagram(_) => self.non_retransmits = true,
6641 NewToken(_) => {}
6642 AddAddress(add_address) => {
6643 self.retransmits_mut().add_address.insert(add_address);
6644 }
6645 RemoveAddress(remove_address) => {
6646 self.retransmits_mut().remove_address.insert(remove_address);
6647 }
6648 StreamMeta(stream_meta_encoder) => self.stream_frames.push(stream_meta_encoder.meta),
6649 MaxData(_) => self.retransmits_mut().max_data = true,
6650 MaxStreamData(max) => {
6651 self.retransmits_mut().max_stream_data.insert(max.id);
6652 }
6653 MaxStreams(max_streams) => {
6654 self.retransmits_mut().max_stream_id[max_streams.dir as usize] = true
6655 }
6656 }
6657 }
6658}
6659
6660fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6668 match (x, y) {
6669 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6670 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6671 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6672 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6673 }
6674}
6675
6676#[cfg(test)]
6677mod tests {
6678 use super::*;
6679
6680 #[test]
6681 fn negotiate_max_idle_timeout_commutative() {
6682 let test_params = [
6683 (None, None, None),
6684 (None, Some(VarInt(0)), None),
6685 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6686 (Some(VarInt(0)), Some(VarInt(0)), None),
6687 (
6688 Some(VarInt(2)),
6689 Some(VarInt(0)),
6690 Some(Duration::from_millis(2)),
6691 ),
6692 (
6693 Some(VarInt(1)),
6694 Some(VarInt(4)),
6695 Some(Duration::from_millis(1)),
6696 ),
6697 ];
6698
6699 for (left, right, result) in test_params {
6700 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6701 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6702 }
6703 }
6704}