1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 num::{NonZeroU32, NonZeroUsize},
8 ops::Not,
9 sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
22 MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
23 TransportErrorCode, VarInt,
24 cid_generator::ConnectionIdGenerator,
25 cid_queue::CidQueue,
26 coding::BufMutExt,
27 config::{ServerConfig, TransportConfig},
28 congestion::Controller,
29 connection::{
30 qlog::{QlogRecvPacket, QlogSentPacket, QlogSink},
31 spaces::LostPacket,
32 timer::{ConnTimer, PathTimer},
33 },
34 crypto::{self, KeyPair, Keys, PacketKey},
35 frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
36 iroh_hp,
37 packet::{
38 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
39 PacketNumber, PartialDecode, SpaceId,
40 },
41 range_set::ArrayRangeSet,
42 shared::{
43 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
44 EndpointEvent, EndpointEventInner,
45 },
46 token::{ResetToken, Token, TokenPayload},
47 transport_parameters::TransportParameters,
48};
49
50mod ack_frequency;
51use ack_frequency::AckFrequencyState;
52
53mod assembler;
54pub use assembler::Chunk;
55
56mod cid_state;
57use cid_state::CidState;
58
59mod datagrams;
60use datagrams::DatagramState;
61pub use datagrams::{Datagrams, SendDatagramError};
62
63mod mtud;
64mod pacing;
65
66mod packet_builder;
67use packet_builder::{PacketBuilder, PadDatagram};
68
69mod packet_crypto;
70use packet_crypto::{PrevCrypto, ZeroRttCrypto};
71
72mod paths;
73pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
74use paths::{PathData, PathState};
75
76pub(crate) mod qlog;
77
78mod send_buffer;
79
80mod spaces;
81#[cfg(fuzzing)]
82pub use spaces::Retransmits;
83#[cfg(not(fuzzing))]
84use spaces::Retransmits;
85use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
86
87mod stats;
88pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
89
90mod streams;
91#[cfg(fuzzing)]
92pub use streams::StreamsState;
93#[cfg(not(fuzzing))]
94use streams::StreamsState;
95pub use streams::{
96 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
97 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
98};
99
100mod timer;
101use timer::{Timer, TimerTable};
102
103mod transmit_buf;
104use transmit_buf::TransmitBuf;
105
106mod state;
107
108#[cfg(not(fuzzing))]
109use state::State;
110#[cfg(fuzzing)]
111pub use state::State;
112use state::StateType;
113
114pub struct Connection {
154 endpoint_config: Arc<EndpointConfig>,
155 config: Arc<TransportConfig>,
156 rng: StdRng,
157 crypto: Box<dyn crypto::Session>,
158 handshake_cid: ConnectionId,
160 rem_handshake_cid: ConnectionId,
162 local_ip: Option<IpAddr>,
165 paths: BTreeMap<PathId, PathState>,
171 path_counter: u64,
175 allow_mtud: bool,
177 state: State,
178 side: ConnectionSide,
179 zero_rtt_enabled: bool,
181 zero_rtt_crypto: Option<ZeroRttCrypto>,
183 key_phase: bool,
184 key_phase_size: u64,
186 peer_params: TransportParameters,
188 orig_rem_cid: ConnectionId,
190 initial_dst_cid: ConnectionId,
192 retry_src_cid: Option<ConnectionId>,
195 events: VecDeque<Event>,
197 endpoint_events: VecDeque<EndpointEventInner>,
198 spin_enabled: bool,
200 spin: bool,
202 spaces: [PacketSpace; 3],
204 highest_space: SpaceId,
206 prev_crypto: Option<PrevCrypto>,
208 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
213 accepted_0rtt: bool,
214 permit_idle_reset: bool,
216 idle_timeout: Option<Duration>,
218 timers: TimerTable,
219 authentication_failures: u64,
221
222 close: bool,
227
228 ack_frequency: AckFrequencyState,
232
233 receiving_ecn: bool,
238 total_authed_packets: u64,
240 app_limited: bool,
243
244 next_observed_addr_seq_no: VarInt,
249
250 streams: StreamsState,
251 rem_cids: FxHashMap<PathId, CidQueue>,
257 local_cid_state: FxHashMap<PathId, CidState>,
264 datagrams: DatagramState,
266 stats: ConnectionStats,
268 path_stats: FxHashMap<PathId, PathStats>,
270 version: u32,
272
273 max_concurrent_paths: NonZeroU32,
282 local_max_path_id: PathId,
297 remote_max_path_id: PathId,
303 max_path_id_with_cids: PathId,
309 abandoned_paths: FxHashSet<PathId>,
317
318 iroh_hp: iroh_hp::State,
319 qlog: QlogSink,
320}
321
322impl Connection {
323 pub(crate) fn new(
324 endpoint_config: Arc<EndpointConfig>,
325 config: Arc<TransportConfig>,
326 init_cid: ConnectionId,
327 loc_cid: ConnectionId,
328 rem_cid: ConnectionId,
329 remote: SocketAddr,
330 local_ip: Option<IpAddr>,
331 crypto: Box<dyn crypto::Session>,
332 cid_gen: &dyn ConnectionIdGenerator,
333 now: Instant,
334 version: u32,
335 allow_mtud: bool,
336 rng_seed: [u8; 32],
337 side_args: SideArgs,
338 qlog: QlogSink,
339 ) -> Self {
340 let pref_addr_cid = side_args.pref_addr_cid();
341 let path_validated = side_args.path_validated();
342 let connection_side = ConnectionSide::from(side_args);
343 let side = connection_side.side();
344 let mut rng = StdRng::from_seed(rng_seed);
345 let initial_space = {
346 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
347 space.crypto = Some(crypto.initial_keys(init_cid, side));
348 space
349 };
350 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
351 #[cfg(test)]
352 let data_space = match config.deterministic_packet_numbers {
353 true => PacketSpace::new_deterministic(now, SpaceId::Data),
354 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
355 };
356 #[cfg(not(test))]
357 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
358 let state = State::handshake(state::Handshake {
359 rem_cid_set: side.is_server(),
360 expected_token: Bytes::new(),
361 client_hello: None,
362 allow_server_migration: side.is_client(),
363 });
364 let local_cid_state = FxHashMap::from_iter([(
365 PathId::ZERO,
366 CidState::new(
367 cid_gen.cid_len(),
368 cid_gen.cid_lifetime(),
369 now,
370 if pref_addr_cid.is_some() { 2 } else { 1 },
371 ),
372 )]);
373
374 let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config);
375 path.open = true;
377 let mut this = Self {
378 endpoint_config,
379 crypto,
380 handshake_cid: loc_cid,
381 rem_handshake_cid: rem_cid,
382 local_cid_state,
383 paths: BTreeMap::from_iter([(
384 PathId::ZERO,
385 PathState {
386 data: path,
387 prev: None,
388 },
389 )]),
390 path_counter: 0,
391 allow_mtud,
392 local_ip,
393 state,
394 side: connection_side,
395 zero_rtt_enabled: false,
396 zero_rtt_crypto: None,
397 key_phase: false,
398 key_phase_size: rng.random_range(10..1000),
405 peer_params: TransportParameters::default(),
406 orig_rem_cid: rem_cid,
407 initial_dst_cid: init_cid,
408 retry_src_cid: None,
409 events: VecDeque::new(),
410 endpoint_events: VecDeque::new(),
411 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
412 spin: false,
413 spaces: [initial_space, handshake_space, data_space],
414 highest_space: SpaceId::Initial,
415 prev_crypto: None,
416 next_crypto: None,
417 accepted_0rtt: false,
418 permit_idle_reset: true,
419 idle_timeout: match config.max_idle_timeout {
420 None | Some(VarInt(0)) => None,
421 Some(dur) => Some(Duration::from_millis(dur.0)),
422 },
423 timers: TimerTable::default(),
424 authentication_failures: 0,
425 close: false,
426
427 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
428 &TransportParameters::default(),
429 )),
430
431 app_limited: false,
432 receiving_ecn: false,
433 total_authed_packets: 0,
434
435 next_observed_addr_seq_no: 0u32.into(),
436
437 streams: StreamsState::new(
438 side,
439 config.max_concurrent_uni_streams,
440 config.max_concurrent_bidi_streams,
441 config.send_window,
442 config.receive_window,
443 config.stream_receive_window,
444 ),
445 datagrams: DatagramState::default(),
446 config,
447 rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
448 rng,
449 stats: ConnectionStats::default(),
450 path_stats: Default::default(),
451 version,
452
453 max_concurrent_paths: NonZeroU32::MIN,
455 local_max_path_id: PathId::ZERO,
456 remote_max_path_id: PathId::ZERO,
457 max_path_id_with_cids: PathId::ZERO,
458 abandoned_paths: Default::default(),
459
460 iroh_hp: Default::default(),
462 qlog,
463 };
464 if path_validated {
465 this.on_path_validated(PathId::ZERO);
466 }
467 if side.is_client() {
468 this.write_crypto();
470 this.init_0rtt(now);
471 }
472 this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now);
473 this
474 }
475
476 #[must_use]
484 pub fn poll_timeout(&mut self) -> Option<Instant> {
485 self.timers.peek()
486 }
487
488 #[must_use]
494 pub fn poll(&mut self) -> Option<Event> {
495 if let Some(x) = self.events.pop_front() {
496 return Some(x);
497 }
498
499 if let Some(event) = self.streams.poll() {
500 return Some(Event::Stream(event));
501 }
502
503 if let Some(reason) = self.state.take_error() {
504 return Some(Event::ConnectionLost { reason });
505 }
506
507 None
508 }
509
510 #[must_use]
512 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
513 self.endpoint_events.pop_front().map(EndpointEvent)
514 }
515
516 #[must_use]
518 pub fn streams(&mut self) -> Streams<'_> {
519 Streams {
520 state: &mut self.streams,
521 conn_state: &self.state,
522 }
523 }
524
525 #[must_use]
527 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
528 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
529 RecvStream {
530 id,
531 state: &mut self.streams,
532 pending: &mut self.spaces[SpaceId::Data].pending,
533 }
534 }
535
536 #[must_use]
538 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
539 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
540 SendStream {
541 id,
542 state: &mut self.streams,
543 pending: &mut self.spaces[SpaceId::Data].pending,
544 conn_state: &self.state,
545 }
546 }
547
548 pub fn open_path_ensure(
555 &mut self,
556 remote: SocketAddr,
557 initial_status: PathStatus,
558 now: Instant,
559 ) -> Result<(PathId, bool), PathError> {
560 match self
561 .paths
562 .iter()
563 .find(|(_id, path)| path.data.remote == remote)
564 {
565 Some((path_id, _state)) => Ok((*path_id, true)),
566 None => self
567 .open_path(remote, initial_status, now)
568 .map(|id| (id, false)),
569 }
570 }
571
572 pub fn open_path(
577 &mut self,
578 remote: SocketAddr,
579 initial_status: PathStatus,
580 now: Instant,
581 ) -> Result<PathId, PathError> {
582 if !self.is_multipath_negotiated() {
583 return Err(PathError::MultipathNotNegotiated);
584 }
585 if self.side().is_server() {
586 return Err(PathError::ServerSideNotAllowed);
587 }
588
589 let max_abandoned = self.abandoned_paths.iter().max().copied();
590 let max_used = self.paths.keys().last().copied();
591 let path_id = max_abandoned
592 .max(max_used)
593 .unwrap_or(PathId::ZERO)
594 .saturating_add(1u8);
595
596 if Some(path_id) > self.max_path_id() {
597 return Err(PathError::MaxPathIdReached);
598 }
599 if path_id > self.remote_max_path_id {
600 self.spaces[SpaceId::Data].pending.paths_blocked = true;
601 return Err(PathError::MaxPathIdReached);
602 }
603 if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
604 self.spaces[SpaceId::Data]
605 .pending
606 .path_cids_blocked
607 .push(path_id);
608 return Err(PathError::RemoteCidsExhausted);
609 }
610
611 let path = self.ensure_path(path_id, remote, now, None);
612 path.status.local_update(initial_status);
613
614 Ok(path_id)
615 }
616
617 pub fn close_path(
623 &mut self,
624 now: Instant,
625 path_id: PathId,
626 error_code: VarInt,
627 ) -> Result<(), ClosePathError> {
628 if self.abandoned_paths.contains(&path_id)
629 || Some(path_id) > self.max_path_id()
630 || !self.paths.contains_key(&path_id)
631 {
632 return Err(ClosePathError::ClosedPath);
633 }
634 if self
635 .paths
636 .iter()
637 .any(|(id, path)| {
639 *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
640 })
641 .not()
642 {
643 return Err(ClosePathError::LastOpenPath);
644 }
645
646 self.spaces[SpaceId::Data]
648 .pending
649 .path_abandon
650 .insert(path_id, error_code.into());
651
652 let pending_space = &mut self.spaces[SpaceId::Data].pending;
654 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
655 pending_space.path_cids_blocked.retain(|&id| id != path_id);
656 pending_space.path_status.retain(|&id| id != path_id);
657
658 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
660 for sent_packet in space.sent_packets.values_mut() {
661 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
662 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
663 retransmits.path_cids_blocked.retain(|&id| id != path_id);
664 retransmits.path_status.retain(|&id| id != path_id);
665 }
666 }
667 }
668
669 self.rem_cids.remove(&path_id);
675 self.endpoint_events
676 .push_back(EndpointEventInner::RetireResetToken(path_id));
677
678 let pto = self.pto_max_path(SpaceId::Data, false);
679
680 let path = self.paths.get_mut(&path_id).expect("checked above");
681
682 path.data.last_allowed_receive = Some(now + 3 * pto);
684 self.abandoned_paths.insert(path_id);
685
686 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
687
688 self.timers.set(
693 Timer::PerPath(path_id, PathTimer::DiscardPath),
694 now + 6 * pto,
695 self.qlog.with_time(now),
696 );
697 Ok(())
698 }
699
700 #[track_caller]
704 fn path_data(&self, path_id: PathId) -> &PathData {
705 if let Some(data) = self.paths.get(&path_id) {
706 &data.data
707 } else {
708 panic!(
709 "unknown path: {path_id}, currently known paths: {:?}",
710 self.paths.keys().collect::<Vec<_>>()
711 );
712 }
713 }
714
715 fn path(&self, path_id: PathId) -> Option<&PathData> {
717 self.paths.get(&path_id).map(|path_state| &path_state.data)
718 }
719
720 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
722 self.paths
723 .get_mut(&path_id)
724 .map(|path_state| &mut path_state.data)
725 }
726
727 pub fn paths(&self) -> Vec<PathId> {
731 self.paths.keys().copied().collect()
732 }
733
734 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
736 self.path(path_id)
737 .map(PathData::local_status)
738 .ok_or(ClosedPath { _private: () })
739 }
740
741 pub fn path_remote_address(&self, path_id: PathId) -> Result<SocketAddr, ClosedPath> {
743 self.path(path_id)
744 .map(|path| path.remote)
745 .ok_or(ClosedPath { _private: () })
746 }
747
748 pub fn set_path_status(
752 &mut self,
753 path_id: PathId,
754 status: PathStatus,
755 ) -> Result<PathStatus, SetPathStatusError> {
756 if !self.is_multipath_negotiated() {
757 return Err(SetPathStatusError::MultipathNotNegotiated);
758 }
759 let path = self
760 .path_mut(path_id)
761 .ok_or(SetPathStatusError::ClosedPath)?;
762 let prev = match path.status.local_update(status) {
763 Some(prev) => {
764 self.spaces[SpaceId::Data]
765 .pending
766 .path_status
767 .insert(path_id);
768 prev
769 }
770 None => path.local_status(),
771 };
772 Ok(prev)
773 }
774
775 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
780 self.path(path_id).and_then(|path| path.remote_status())
781 }
782
783 pub fn set_path_max_idle_timeout(
789 &mut self,
790 path_id: PathId,
791 timeout: Option<Duration>,
792 ) -> Result<Option<Duration>, ClosedPath> {
793 let path = self
794 .paths
795 .get_mut(&path_id)
796 .ok_or(ClosedPath { _private: () })?;
797 Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
798 }
799
800 pub fn set_path_keep_alive_interval(
806 &mut self,
807 path_id: PathId,
808 interval: Option<Duration>,
809 ) -> Result<Option<Duration>, ClosedPath> {
810 let path = self
811 .paths
812 .get_mut(&path_id)
813 .ok_or(ClosedPath { _private: () })?;
814 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
815 }
816
817 #[track_caller]
821 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
822 &mut self.paths.get_mut(&path_id).expect("known path").data
823 }
824
825 fn ensure_path(
826 &mut self,
827 path_id: PathId,
828 remote: SocketAddr,
829 now: Instant,
830 pn: Option<u64>,
831 ) -> &mut PathData {
832 let vacant_entry = match self.paths.entry(path_id) {
833 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
834 btree_map::Entry::Occupied(occupied_entry) => {
835 return &mut occupied_entry.into_mut().data;
836 }
837 };
838
839 debug!(%path_id, ?remote, "path added");
842 let peer_max_udp_payload_size =
843 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
844 self.path_counter = self.path_counter.wrapping_add(1);
845 let mut data = PathData::new(
846 remote,
847 self.allow_mtud,
848 Some(peer_max_udp_payload_size),
849 self.path_counter,
850 now,
851 &self.config,
852 );
853
854 let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
855 self.timers.set(
856 Timer::PerPath(path_id, PathTimer::PathOpen),
857 now + 3 * pto,
858 self.qlog.with_time(now),
859 );
860
861 data.send_new_challenge = true;
864
865 let path = vacant_entry.insert(PathState { data, prev: None });
866
867 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
868 if let Some(pn) = pn {
869 pn_space.dedup.insert(pn);
870 }
871 self.spaces[SpaceId::Data]
872 .number_spaces
873 .insert(path_id, pn_space);
874 self.qlog.emit_tuple_assigned(path_id, remote, now);
875 &mut path.data
876 }
877
878 #[must_use]
888 pub fn poll_transmit(
889 &mut self,
890 now: Instant,
891 max_datagrams: NonZeroUsize,
892 buf: &mut Vec<u8>,
893 ) -> Option<Transmit> {
894 if let Some(probing) = self
895 .iroh_hp
896 .server_side_mut()
897 .ok()
898 .and_then(iroh_hp::ServerState::next_probe)
899 {
900 let destination = probing.remote();
901 trace!(%destination, "RAND_DATA packet");
902 let token: u64 = self.rng.random();
903 buf.put_u64(token);
904 probing.finish(token);
905 return Some(Transmit {
906 destination,
907 ecn: None,
908 size: 8,
909 segment_size: None,
910 src_ip: None,
911 });
912 }
913
914 let max_datagrams = match self.config.enable_segmentation_offload {
915 false => NonZeroUsize::MIN,
916 true => max_datagrams,
917 };
918
919 let close = match self.state.as_type() {
938 StateType::Drained => {
939 self.app_limited = true;
940 return None;
941 }
942 StateType::Draining | StateType::Closed => {
943 if !self.close {
946 self.app_limited = true;
947 return None;
948 }
949 true
950 }
951 _ => false,
952 };
953
954 if let Some(config) = &self.config.ack_frequency_config {
956 let rtt = self
957 .paths
958 .values()
959 .map(|p| p.data.rtt.get())
960 .min()
961 .expect("one path exists");
962 self.spaces[SpaceId::Data].pending.ack_frequency = self
963 .ack_frequency
964 .should_send_ack_frequency(rtt, config, &self.peer_params)
965 && self.highest_space == SpaceId::Data
966 && self.peer_supports_ack_frequency();
967 }
968
969 let mut coalesce = true;
971
972 let mut pad_datagram = PadDatagram::No;
975
976 let mut congestion_blocked = false;
980
981 let mut last_packet_number = None;
983
984 let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
985
986 let have_available_path = self.paths.iter().any(|(id, path)| {
989 path.data.validated
990 && path.data.local_status() == PathStatus::Available
991 && self.rem_cids.contains_key(id)
992 });
993
994 let mut transmit = TransmitBuf::new(
996 buf,
997 max_datagrams,
998 self.path_data(path_id).current_mtu().into(),
999 );
1000 if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
1001 return Some(challenge);
1002 }
1003 let mut space_id = match path_id {
1004 PathId::ZERO => SpaceId::Initial,
1005 _ => SpaceId::Data,
1006 };
1007
1008 loop {
1009 let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1011 let err = PathError::RemoteCidsExhausted;
1012 if !self.abandoned_paths.contains(&path_id) {
1013 debug!(?err, %path_id, "no active CID for path");
1014 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1015 id: path_id,
1016 error: err,
1017 }));
1018 self.close_path(
1022 now,
1023 path_id,
1024 TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1025 )
1026 .ok();
1027 self.spaces[SpaceId::Data]
1028 .pending
1029 .path_cids_blocked
1030 .push(path_id);
1031 } else {
1032 trace!(%path_id, "remote CIDs retired for abandoned path");
1033 }
1034
1035 match self.paths.keys().find(|&&next| next > path_id) {
1036 Some(next_path_id) => {
1037 path_id = *next_path_id;
1039 space_id = SpaceId::Data;
1040
1041 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1043 if let Some(challenge) =
1044 self.send_prev_path_challenge(now, &mut transmit, path_id)
1045 {
1046 return Some(challenge);
1047 }
1048
1049 continue;
1050 }
1051 None => {
1052 trace!(
1054 ?space_id,
1055 %path_id,
1056 "no CIDs to send on path, no more paths"
1057 );
1058 break;
1059 }
1060 }
1061 };
1062
1063 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1066 transmit.datagram_remaining_mut()
1068 } else {
1069 transmit.segment_size()
1071 };
1072 let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1073 let path_should_send = {
1074 let path_exclusive_only = space_id == SpaceId::Data
1075 && have_available_path
1076 && self.path_data(path_id).local_status() == PathStatus::Backup;
1077 let path_should_send = if path_exclusive_only {
1078 can_send.path_exclusive
1079 } else {
1080 !can_send.is_empty()
1081 };
1082 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1083 path_should_send || needs_loss_probe
1084 };
1085
1086 if !path_should_send && space_id < SpaceId::Data {
1087 if self.spaces[space_id].crypto.is_some() {
1088 trace!(?space_id, %path_id, "nothing to send in space");
1089 }
1090 space_id = space_id.next();
1091 continue;
1092 }
1093
1094 let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1095 self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1097 } else {
1098 PathBlocked::No
1099 };
1100 if send_blocked != PathBlocked::No {
1101 trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1102 congestion_blocked = true;
1103 }
1104 if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1105 space_id = space_id.next();
1108 continue;
1109 }
1110 if !path_should_send || send_blocked != PathBlocked::No {
1111 if transmit.num_datagrams() > 0 {
1116 break;
1117 }
1118
1119 match self.paths.keys().find(|&&next| next > path_id) {
1120 Some(next_path_id) => {
1121 trace!(
1123 ?space_id,
1124 %path_id,
1125 %next_path_id,
1126 "nothing to send on path"
1127 );
1128 path_id = *next_path_id;
1129 space_id = SpaceId::Data;
1130
1131 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1133 if let Some(challenge) =
1134 self.send_prev_path_challenge(now, &mut transmit, path_id)
1135 {
1136 return Some(challenge);
1137 }
1138
1139 continue;
1140 }
1141 None => {
1142 trace!(
1144 ?space_id,
1145 %path_id,
1146 next_path_id=?None::<PathId>,
1147 "nothing to send on path"
1148 );
1149 break;
1150 }
1151 }
1152 }
1153
1154 if transmit.datagram_remaining_mut() == 0 {
1156 if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1157 break;
1159 }
1160
1161 match self.spaces[space_id].for_path(path_id).loss_probes {
1162 0 => transmit.start_new_datagram(),
1163 _ => {
1164 let request_immediate_ack =
1166 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1167 self.spaces[space_id].maybe_queue_probe(
1168 path_id,
1169 request_immediate_ack,
1170 &self.streams,
1171 );
1172
1173 self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1174
1175 transmit.start_new_datagram_with_size(std::cmp::min(
1179 usize::from(INITIAL_MTU),
1180 transmit.segment_size(),
1181 ));
1182 }
1183 }
1184 trace!(count = transmit.num_datagrams(), "new datagram started");
1185 coalesce = true;
1186 pad_datagram = PadDatagram::No;
1187 }
1188
1189 if transmit.datagram_start_offset() < transmit.len() {
1192 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1193 }
1194
1195 if self.spaces[SpaceId::Initial].crypto.is_some()
1200 && space_id == SpaceId::Handshake
1201 && self.side.is_client()
1202 {
1203 self.discard_space(now, SpaceId::Initial);
1206 }
1207 if let Some(ref mut prev) = self.prev_crypto {
1208 prev.update_unacked = false;
1209 }
1210
1211 let mut qlog = QlogSentPacket::default();
1212 let mut builder = PacketBuilder::new(
1213 now,
1214 space_id,
1215 path_id,
1216 remote_cid,
1217 &mut transmit,
1218 can_send.other,
1219 self,
1220 &mut qlog,
1221 )?;
1222 last_packet_number = Some(builder.exact_number);
1223 coalesce = coalesce && !builder.short_header;
1224
1225 if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1226 pad_datagram |= PadDatagram::ToMinMtu;
1228 }
1229 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1230 pad_datagram |= PadDatagram::ToSegmentSize;
1231 }
1232
1233 if can_send.close {
1234 trace!("sending CONNECTION_CLOSE");
1235 let mut sent_frames = SentFrames::default();
1240 let is_multipath_negotiated = self.is_multipath_negotiated();
1241 for path_id in self.spaces[space_id]
1242 .number_spaces
1243 .iter()
1244 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1245 .map(|(&path_id, _)| path_id)
1246 .collect::<Vec<_>>()
1247 {
1248 Self::populate_acks(
1249 now,
1250 self.receiving_ecn,
1251 &mut sent_frames,
1252 path_id,
1253 space_id,
1254 &mut self.spaces[space_id],
1255 is_multipath_negotiated,
1256 &mut builder.frame_space_mut(),
1257 &mut self.stats,
1258 &mut qlog,
1259 );
1260 }
1261
1262 debug_assert!(
1266 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1267 "ACKs should leave space for ConnectionClose"
1268 );
1269 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1270 let max_frame_size = builder.frame_space_remaining();
1271 match self.state.as_type() {
1272 StateType::Closed => {
1273 let reason: Close =
1274 self.state.as_closed().expect("checked").clone().into();
1275 if space_id == SpaceId::Data || reason.is_transport_layer() {
1276 reason.encode(&mut builder.frame_space_mut(), max_frame_size);
1277 qlog.frame(&Frame::Close(reason));
1278 } else {
1279 let frame = frame::ConnectionClose {
1280 error_code: TransportErrorCode::APPLICATION_ERROR,
1281 frame_type: None,
1282 reason: Bytes::new(),
1283 };
1284 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1285 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1286 }
1287 }
1288 StateType::Draining => {
1289 let frame = frame::ConnectionClose {
1290 error_code: TransportErrorCode::NO_ERROR,
1291 frame_type: None,
1292 reason: Bytes::new(),
1293 };
1294 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1295 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1296 }
1297 _ => unreachable!(
1298 "tried to make a close packet when the connection wasn't closed"
1299 ),
1300 };
1301 }
1302 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1303 if space_id == self.highest_space {
1304 self.close = false;
1307 break;
1309 } else {
1310 space_id = space_id.next();
1314 continue;
1315 }
1316 }
1317
1318 if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1321 let path = self.path_data_mut(path_id);
1322 if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) {
1323 let response = frame::PathResponse(token);
1327 trace!(%response, "(off-path)");
1328 builder.frame_space_mut().write(response);
1329 qlog.frame(&Frame::PathResponse(response));
1330 self.stats.frame_tx.path_response += 1;
1331 builder.finish_and_track(
1332 now,
1333 self,
1334 path_id,
1335 SentFrames {
1336 non_retransmits: true,
1337 ..SentFrames::default()
1338 },
1339 PadDatagram::ToMinMtu,
1340 qlog,
1341 );
1342 self.stats.udp_tx.on_sent(1, transmit.len());
1343 return Some(Transmit {
1344 destination: remote,
1345 size: transmit.len(),
1346 ecn: None,
1347 segment_size: None,
1348 src_ip: self.local_ip,
1349 });
1350 }
1351 }
1352
1353 let sent_frames = {
1354 let path_exclusive_only = have_available_path
1355 && self.path_data(path_id).local_status() == PathStatus::Backup;
1356 let pn = builder.exact_number;
1357 self.populate_packet(
1358 now,
1359 space_id,
1360 path_id,
1361 path_exclusive_only,
1362 &mut builder.frame_space_mut(),
1363 pn,
1364 &mut qlog,
1365 )
1366 };
1367
1368 debug_assert!(
1375 !(sent_frames.is_ack_only(&self.streams)
1376 && !can_send.acks
1377 && can_send.other
1378 && builder.buf.segment_size()
1379 == self.path_data(path_id).current_mtu() as usize
1380 && self.datagrams.outgoing.is_empty()),
1381 "SendableFrames was {can_send:?}, but only ACKs have been written"
1382 );
1383 if sent_frames.requires_padding {
1384 pad_datagram |= PadDatagram::ToMinMtu;
1385 }
1386
1387 for (path_id, _pn) in sent_frames.largest_acked.iter() {
1388 self.spaces[space_id]
1389 .for_path(*path_id)
1390 .pending_acks
1391 .acks_sent();
1392 self.timers.stop(
1393 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1394 self.qlog.with_time(now),
1395 );
1396 }
1397
1398 if coalesce
1406 && builder
1407 .buf
1408 .datagram_remaining_mut()
1409 .saturating_sub(builder.predict_packet_end())
1410 > MIN_PACKET_SPACE
1411 && self
1412 .next_send_space(space_id, path_id, builder.buf, close)
1413 .is_some()
1414 {
1415 builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog);
1418 } else {
1419 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1422 const MAX_PADDING: usize = 32;
1430 if builder.buf.datagram_remaining_mut()
1431 > builder.predict_packet_end() + MAX_PADDING
1432 {
1433 trace!(
1434 "GSO truncated by demand for {} padding bytes",
1435 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1436 );
1437 builder.finish_and_track(
1438 now,
1439 self,
1440 path_id,
1441 sent_frames,
1442 PadDatagram::No,
1443 qlog,
1444 );
1445 break;
1446 }
1447
1448 builder.finish_and_track(
1451 now,
1452 self,
1453 path_id,
1454 sent_frames,
1455 PadDatagram::ToSegmentSize,
1456 qlog,
1457 );
1458 } else {
1459 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1460 }
1461 if transmit.num_datagrams() == 1 {
1462 transmit.clip_datagram_size();
1463 }
1464 }
1465 }
1466
1467 if let Some(last_packet_number) = last_packet_number {
1468 self.path_data_mut(path_id).congestion.on_sent(
1471 now,
1472 transmit.len() as u64,
1473 last_packet_number,
1474 );
1475 }
1476
1477 self.qlog.emit_recovery_metrics(
1478 path_id,
1479 &mut self.paths.get_mut(&path_id).unwrap().data,
1480 now,
1481 );
1482
1483 self.app_limited = transmit.is_empty() && !congestion_blocked;
1484
1485 if transmit.is_empty() && self.state.is_established() {
1487 let space_id = SpaceId::Data;
1489 path_id = *self.paths.first_key_value().expect("one path must exist").0;
1490 let probe_data = loop {
1491 let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1497 let eligible = self.path_data(path_id).validated
1498 && !self.path_data(path_id).is_validating_path()
1499 && !self.abandoned_paths.contains(&path_id);
1500 let probe_size = eligible
1501 .then(|| {
1502 let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1503 self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1504 })
1505 .flatten();
1506 match (active_cid, probe_size) {
1507 (Some(active_cid), Some(probe_size)) => {
1508 break Some((active_cid, probe_size));
1510 }
1511 _ => {
1512 match self.paths.keys().find(|&&next| next > path_id) {
1514 Some(next) => {
1515 path_id = *next;
1516 continue;
1517 }
1518 None => break None,
1519 }
1520 }
1521 }
1522 };
1523 if let Some((active_cid, probe_size)) = probe_data {
1524 debug_assert_eq!(transmit.num_datagrams(), 0);
1526 transmit.start_new_datagram_with_size(probe_size as usize);
1527
1528 let mut qlog = QlogSentPacket::default();
1529 let mut builder = PacketBuilder::new(
1530 now,
1531 space_id,
1532 path_id,
1533 active_cid,
1534 &mut transmit,
1535 true,
1536 self,
1537 &mut qlog,
1538 )?;
1539
1540 trace!(?probe_size, "writing MTUD probe");
1542 trace!("PING");
1543 builder.frame_space_mut().write(frame::FrameType::PING);
1544 qlog.frame(&Frame::Ping);
1545 self.stats.frame_tx.ping += 1;
1546
1547 if self.peer_supports_ack_frequency() {
1549 trace!("IMMEDIATE_ACK");
1550 builder
1551 .frame_space_mut()
1552 .write(frame::FrameType::IMMEDIATE_ACK);
1553 self.stats.frame_tx.immediate_ack += 1;
1554 qlog.frame(&Frame::ImmediateAck);
1555 }
1556
1557 let sent_frames = SentFrames {
1558 non_retransmits: true,
1559 ..Default::default()
1560 };
1561 builder.finish_and_track(
1562 now,
1563 self,
1564 path_id,
1565 sent_frames,
1566 PadDatagram::ToSize(probe_size),
1567 qlog,
1568 );
1569
1570 self.path_stats
1571 .entry(path_id)
1572 .or_default()
1573 .sent_plpmtud_probes += 1;
1574 }
1575 }
1576
1577 if transmit.is_empty() {
1578 return None;
1579 }
1580
1581 let destination = self.path_data(path_id).remote;
1582 trace!(
1583 segment_size = transmit.segment_size(),
1584 last_datagram_len = transmit.len() % transmit.segment_size(),
1585 ?destination,
1586 "sending {} bytes in {} datagrams",
1587 transmit.len(),
1588 transmit.num_datagrams()
1589 );
1590 self.path_data_mut(path_id)
1591 .inc_total_sent(transmit.len() as u64);
1592
1593 self.stats
1594 .udp_tx
1595 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1596
1597 Some(Transmit {
1598 destination,
1599 size: transmit.len(),
1600 ecn: if self.path_data(path_id).sending_ecn {
1601 Some(EcnCodepoint::Ect0)
1602 } else {
1603 None
1604 },
1605 segment_size: match transmit.num_datagrams() {
1606 1 => None,
1607 _ => Some(transmit.segment_size()),
1608 },
1609 src_ip: self.local_ip,
1610 })
1611 }
1612
1613 fn next_send_space(
1618 &mut self,
1619 current_space_id: SpaceId,
1620 path_id: PathId,
1621 buf: &TransmitBuf<'_>,
1622 close: bool,
1623 ) -> Option<SpaceId> {
1624 let mut space_id = current_space_id;
1631 loop {
1632 let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1633 if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1634 return Some(space_id);
1635 }
1636 space_id = match space_id {
1637 SpaceId::Initial => SpaceId::Handshake,
1638 SpaceId::Handshake => SpaceId::Data,
1639 SpaceId::Data => break,
1640 }
1641 }
1642 None
1643 }
1644
1645 fn path_congestion_check(
1647 &mut self,
1648 space_id: SpaceId,
1649 path_id: PathId,
1650 transmit: &TransmitBuf<'_>,
1651 can_send: &SendableFrames,
1652 now: Instant,
1653 ) -> PathBlocked {
1654 if self.side().is_server()
1660 && self
1661 .path_data(path_id)
1662 .anti_amplification_blocked(transmit.len() as u64 + 1)
1663 {
1664 trace!(?space_id, %path_id, "blocked by anti-amplification");
1665 return PathBlocked::AntiAmplification;
1666 }
1667
1668 let bytes_to_send = transmit.segment_size() as u64;
1671 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1672
1673 if can_send.other && !need_loss_probe && !can_send.close {
1674 let path = self.path_data(path_id);
1675 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1676 trace!(?space_id, %path_id, "blocked by congestion control");
1677 return PathBlocked::Congestion;
1678 }
1679 }
1680
1681 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1683 self.timers.set(
1684 Timer::PerPath(path_id, PathTimer::Pacing),
1685 delay,
1686 self.qlog.with_time(now),
1687 );
1688 trace!(?space_id, %path_id, "blocked by pacing");
1691 return PathBlocked::Pacing;
1692 }
1693
1694 PathBlocked::No
1695 }
1696
1697 fn send_prev_path_challenge(
1702 &mut self,
1703 now: Instant,
1704 buf: &mut TransmitBuf<'_>,
1705 path_id: PathId,
1706 ) -> Option<Transmit> {
1707 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1708 if !prev_path.send_new_challenge {
1711 return None;
1712 };
1713 prev_path.send_new_challenge = false;
1714 let destination = prev_path.remote;
1715 let token = self.rng.random();
1716 let info = paths::SentChallengeInfo {
1717 sent_instant: now,
1718 remote: destination,
1719 };
1720 prev_path.challenges_sent.insert(token, info);
1721 debug_assert_eq!(
1722 self.highest_space,
1723 SpaceId::Data,
1724 "PATH_CHALLENGE queued without 1-RTT keys"
1725 );
1726 buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1727
1728 debug_assert_eq!(buf.datagram_start_offset(), 0);
1734 let mut qlog = QlogSentPacket::default();
1735 let mut builder = PacketBuilder::new(
1736 now,
1737 SpaceId::Data,
1738 path_id,
1739 *prev_cid,
1740 buf,
1741 false,
1742 self,
1743 &mut qlog,
1744 )?;
1745 let challenge = frame::PathChallenge(token);
1746 trace!(%challenge, "validating previous path");
1747 qlog.frame(&Frame::PathChallenge(challenge));
1748 builder.frame_space_mut().write(challenge);
1749 self.stats.frame_tx.path_challenge += 1;
1750
1751 builder.pad_to(MIN_INITIAL_SIZE);
1756
1757 builder.finish(self, now, qlog);
1758 self.stats.udp_tx.on_sent(1, buf.len());
1759
1760 Some(Transmit {
1761 destination,
1762 size: buf.len(),
1763 ecn: None,
1764 segment_size: None,
1765 src_ip: self.local_ip,
1766 })
1767 }
1768
1769 fn space_can_send(
1774 &mut self,
1775 space_id: SpaceId,
1776 path_id: PathId,
1777 packet_size: usize,
1778 close: bool,
1779 ) -> SendableFrames {
1780 let pn = self.spaces[SpaceId::Data]
1781 .for_path(path_id)
1782 .peek_tx_number();
1783 let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1784 if self.spaces[space_id].crypto.is_none()
1785 && (space_id != SpaceId::Data
1786 || self.zero_rtt_crypto.is_none()
1787 || self.side.is_server())
1788 {
1789 return SendableFrames::empty();
1791 }
1792 let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1793 if space_id == SpaceId::Data {
1794 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1795 }
1796
1797 can_send.close = close && self.spaces[space_id].crypto.is_some();
1798
1799 can_send
1800 }
1801
1802 pub fn handle_event(&mut self, event: ConnectionEvent) {
1808 use ConnectionEventInner::*;
1809 match event.0 {
1810 Datagram(DatagramConnectionEvent {
1811 now,
1812 remote,
1813 path_id,
1814 ecn,
1815 first_decode,
1816 remaining,
1817 }) => {
1818 let span = trace_span!("pkt", %path_id);
1819 let _guard = span.enter();
1820 if let Some(known_remote) = self.path(path_id).map(|path| path.remote) {
1824 if remote != known_remote && !self.side.remote_may_migrate(&self.state) {
1825 trace!(
1826 %path_id,
1827 ?remote,
1828 path_remote = ?self.path(path_id).map(|p| p.remote),
1829 "discarding packet from unrecognized peer"
1830 );
1831 return;
1832 }
1833 }
1834
1835 let was_anti_amplification_blocked = self
1836 .path(path_id)
1837 .map(|path| path.anti_amplification_blocked(1))
1838 .unwrap_or(true); self.stats.udp_rx.datagrams += 1;
1842 self.stats.udp_rx.bytes += first_decode.len() as u64;
1843 let data_len = first_decode.len();
1844
1845 self.handle_decode(now, remote, path_id, ecn, first_decode);
1846 if let Some(path) = self.path_mut(path_id) {
1851 path.inc_total_recvd(data_len as u64);
1852 }
1853
1854 if let Some(data) = remaining {
1855 self.stats.udp_rx.bytes += data.len() as u64;
1856 self.handle_coalesced(now, remote, path_id, ecn, data);
1857 }
1858
1859 if let Some(path) = self.paths.get_mut(&path_id) {
1860 self.qlog
1861 .emit_recovery_metrics(path_id, &mut path.data, now);
1862 }
1863
1864 if was_anti_amplification_blocked {
1865 self.set_loss_detection_timer(now, path_id);
1869 }
1870 }
1871 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1872 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1873 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1874 let cid_state = self
1875 .local_cid_state
1876 .entry(path_id)
1877 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1878 cid_state.new_cids(&ids, now);
1879
1880 ids.into_iter().rev().for_each(|frame| {
1881 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1882 });
1883 self.reset_cid_retirement(now);
1885 }
1886 }
1887 }
1888
1889 pub fn handle_timeout(&mut self, now: Instant) {
1899 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1900 trace!(?timer, at=?now, "timeout");
1902 match timer {
1903 Timer::Conn(timer) => match timer {
1904 ConnTimer::Close => {
1905 self.state.move_to_drained(None);
1906 self.endpoint_events.push_back(EndpointEventInner::Drained);
1907 }
1908 ConnTimer::Idle => {
1909 self.kill(ConnectionError::TimedOut);
1910 }
1911 ConnTimer::KeepAlive => {
1912 trace!("sending keep-alive");
1913 self.ping();
1914 }
1915 ConnTimer::KeyDiscard => {
1916 self.zero_rtt_crypto = None;
1917 self.prev_crypto = None;
1918 }
1919 ConnTimer::PushNewCid => {
1920 while let Some((path_id, when)) = self.next_cid_retirement() {
1921 if when > now {
1922 break;
1923 }
1924 match self.local_cid_state.get_mut(&path_id) {
1925 None => error!(%path_id, "No local CID state for path"),
1926 Some(cid_state) => {
1927 let num_new_cid = cid_state.on_cid_timeout().into();
1929 if !self.state.is_closed() {
1930 trace!(
1931 "push a new CID to peer RETIRE_PRIOR_TO field {}",
1932 cid_state.retire_prior_to()
1933 );
1934 self.endpoint_events.push_back(
1935 EndpointEventInner::NeedIdentifiers(
1936 path_id,
1937 now,
1938 num_new_cid,
1939 ),
1940 );
1941 }
1942 }
1943 }
1944 }
1945 }
1946 },
1947 Timer::PerPath(path_id, timer) => {
1949 let span = trace_span!("per-path timer fired", %path_id, ?timer);
1950 let _guard = span.enter();
1951 match timer {
1952 PathTimer::PathIdle => {
1953 self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1954 .ok();
1955 }
1956
1957 PathTimer::PathKeepAlive => {
1958 trace!("sending keep-alive on path");
1959 self.ping_path(path_id).ok();
1960 }
1961 PathTimer::LossDetection => {
1962 self.on_loss_detection_timeout(now, path_id);
1963 self.qlog.emit_recovery_metrics(
1964 path_id,
1965 &mut self.paths.get_mut(&path_id).unwrap().data,
1966 now,
1967 );
1968 }
1969 PathTimer::PathValidation => {
1970 let Some(path) = self.paths.get_mut(&path_id) else {
1971 continue;
1972 };
1973 self.timers.stop(
1974 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1975 self.qlog.with_time(now),
1976 );
1977 debug!("path validation failed");
1978 if let Some((_, prev)) = path.prev.take() {
1979 path.data = prev;
1980 }
1981 path.data.challenges_sent.clear();
1982 path.data.send_new_challenge = false;
1983 }
1984 PathTimer::PathChallengeLost => {
1985 let Some(path) = self.paths.get_mut(&path_id) else {
1986 continue;
1987 };
1988 trace!("path challenge deemed lost");
1989 path.data.send_new_challenge = true;
1990 }
1991 PathTimer::PathOpen => {
1992 let Some(path) = self.path_mut(path_id) else {
1993 continue;
1994 };
1995 path.challenges_sent.clear();
1996 path.send_new_challenge = false;
1997 debug!("new path validation failed");
1998 if let Err(err) = self.close_path(
1999 now,
2000 path_id,
2001 TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2002 ) {
2003 warn!(?err, "failed closing path");
2004 }
2005
2006 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2007 id: path_id,
2008 error: PathError::ValidationFailed,
2009 }));
2010 }
2011 PathTimer::Pacing => trace!("pacing timer expired"),
2012 PathTimer::MaxAckDelay => {
2013 trace!("max ack delay reached");
2014 self.spaces[SpaceId::Data]
2016 .for_path(path_id)
2017 .pending_acks
2018 .on_max_ack_delay_timeout()
2019 }
2020 PathTimer::DiscardPath => {
2021 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2024 if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2025 let (min_seq, max_seq) = loc_cid_state.active_seq();
2026 for seq in min_seq..=max_seq {
2027 self.endpoint_events.push_back(
2028 EndpointEventInner::RetireConnectionId(
2029 now, path_id, seq, false,
2030 ),
2031 );
2032 }
2033 }
2034 self.discard_path(path_id, now);
2035 }
2036 }
2037 }
2038 }
2039 }
2040 }
2041
2042 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2054 self.close_inner(
2055 now,
2056 Close::Application(frame::ApplicationClose { error_code, reason }),
2057 )
2058 }
2059
2060 fn close_inner(&mut self, now: Instant, reason: Close) {
2061 let was_closed = self.state.is_closed();
2062 if !was_closed {
2063 self.close_common();
2064 self.set_close_timer(now);
2065 self.close = true;
2066 self.state.move_to_closed_local(reason);
2067 }
2068 }
2069
2070 pub fn datagrams(&mut self) -> Datagrams<'_> {
2072 Datagrams { conn: self }
2073 }
2074
2075 pub fn stats(&mut self) -> ConnectionStats {
2077 self.stats.clone()
2078 }
2079
2080 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2082 let path = self.paths.get(&path_id)?;
2083 let stats = self.path_stats.entry(path_id).or_default();
2084 stats.rtt = path.data.rtt.get();
2085 stats.cwnd = path.data.congestion.window();
2086 stats.current_mtu = path.data.mtud.current_mtu();
2087 Some(*stats)
2088 }
2089
2090 pub fn ping(&mut self) {
2094 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2097 path_data.ping_pending = true;
2098 }
2099 }
2100
2101 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2105 let path_data = self.spaces[self.highest_space]
2106 .number_spaces
2107 .get_mut(&path)
2108 .ok_or(ClosedPath { _private: () })?;
2109 path_data.ping_pending = true;
2110 Ok(())
2111 }
2112
2113 pub fn force_key_update(&mut self) {
2117 if !self.state.is_established() {
2118 debug!("ignoring forced key update in illegal state");
2119 return;
2120 }
2121 if self.prev_crypto.is_some() {
2122 debug!("ignoring redundant forced key update");
2125 return;
2126 }
2127 self.update_keys(None, false);
2128 }
2129
2130 #[doc(hidden)]
2132 #[deprecated]
2133 pub fn initiate_key_update(&mut self) {
2134 self.force_key_update();
2135 }
2136
2137 pub fn crypto_session(&self) -> &dyn crypto::Session {
2139 &*self.crypto
2140 }
2141
2142 pub fn is_handshaking(&self) -> bool {
2147 self.state.is_handshake()
2148 }
2149
2150 pub fn is_closed(&self) -> bool {
2158 self.state.is_closed()
2159 }
2160
2161 pub fn is_drained(&self) -> bool {
2166 self.state.is_drained()
2167 }
2168
2169 pub fn accepted_0rtt(&self) -> bool {
2173 self.accepted_0rtt
2174 }
2175
2176 pub fn has_0rtt(&self) -> bool {
2178 self.zero_rtt_enabled
2179 }
2180
2181 pub fn has_pending_retransmits(&self) -> bool {
2183 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2184 }
2185
2186 pub fn side(&self) -> Side {
2188 self.side.side()
2189 }
2190
2191 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2193 self.path(path_id)
2194 .map(|path_data| {
2195 path_data
2196 .last_observed_addr_report
2197 .as_ref()
2198 .map(|observed| observed.socket_addr())
2199 })
2200 .ok_or(ClosedPath { _private: () })
2201 }
2202
2203 pub fn local_ip(&self) -> Option<IpAddr> {
2213 self.local_ip
2214 }
2215
2216 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2218 self.path(path_id).map(|d| d.rtt.get())
2219 }
2220
2221 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2223 self.path(path_id).map(|d| d.congestion.as_ref())
2224 }
2225
2226 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2231 self.streams.set_max_concurrent(dir, count);
2232 let pending = &mut self.spaces[SpaceId::Data].pending;
2235 self.streams.queue_max_stream_id(pending);
2236 }
2237
2238 pub fn set_max_concurrent_paths(
2248 &mut self,
2249 now: Instant,
2250 count: NonZeroU32,
2251 ) -> Result<(), MultipathNotNegotiated> {
2252 if !self.is_multipath_negotiated() {
2253 return Err(MultipathNotNegotiated { _private: () });
2254 }
2255 self.max_concurrent_paths = count;
2256
2257 let in_use_count = self
2258 .local_max_path_id
2259 .next()
2260 .saturating_sub(self.abandoned_paths.len() as u32)
2261 .as_u32();
2262 let extra_needed = count.get().saturating_sub(in_use_count);
2263 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2264
2265 self.set_max_path_id(now, new_max_path_id);
2266
2267 Ok(())
2268 }
2269
2270 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2272 if max_path_id <= self.local_max_path_id {
2273 return;
2274 }
2275
2276 self.local_max_path_id = max_path_id;
2277 self.spaces[SpaceId::Data].pending.max_path_id = true;
2278
2279 self.issue_first_path_cids(now);
2280 }
2281
2282 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2288 self.streams.max_concurrent(dir)
2289 }
2290
2291 pub fn set_send_window(&mut self, send_window: u64) {
2293 self.streams.set_send_window(send_window);
2294 }
2295
2296 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2298 if self.streams.set_receive_window(receive_window) {
2299 self.spaces[SpaceId::Data].pending.max_data = true;
2300 }
2301 }
2302
2303 pub fn is_multipath_negotiated(&self) -> bool {
2308 !self.is_handshaking()
2309 && self.config.max_concurrent_multipath_paths.is_some()
2310 && self.peer_params.initial_max_path_id.is_some()
2311 }
2312
2313 fn on_ack_received(
2314 &mut self,
2315 now: Instant,
2316 space: SpaceId,
2317 ack: frame::Ack,
2318 ) -> Result<(), TransportError> {
2319 let path = PathId::ZERO;
2321 self.inner_on_ack_received(now, space, path, ack)
2322 }
2323
2324 fn on_path_ack_received(
2325 &mut self,
2326 now: Instant,
2327 space: SpaceId,
2328 path_ack: frame::PathAck,
2329 ) -> Result<(), TransportError> {
2330 let (ack, path) = path_ack.into_ack();
2331 self.inner_on_ack_received(now, space, path, ack)
2332 }
2333
2334 fn inner_on_ack_received(
2336 &mut self,
2337 now: Instant,
2338 space: SpaceId,
2339 path: PathId,
2340 ack: frame::Ack,
2341 ) -> Result<(), TransportError> {
2342 if self.abandoned_paths.contains(&path) {
2343 trace!("silently ignoring PATH_ACK on abandoned path");
2346 return Ok(());
2347 }
2348 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2349 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2350 }
2351 let new_largest = {
2352 let space = &mut self.spaces[space].for_path(path);
2353 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2354 space.largest_acked_packet = Some(ack.largest);
2355 if let Some(info) = space.sent_packets.get(ack.largest) {
2356 space.largest_acked_packet_sent = info.time_sent;
2360 }
2361 true
2362 } else {
2363 false
2364 }
2365 };
2366
2367 if self.detect_spurious_loss(&ack, space, path) {
2368 self.path_data_mut(path)
2369 .congestion
2370 .on_spurious_congestion_event();
2371 }
2372
2373 let mut newly_acked = ArrayRangeSet::new();
2375 for range in ack.iter() {
2376 self.spaces[space].for_path(path).check_ack(range.clone())?;
2377 for (pn, _) in self.spaces[space]
2378 .for_path(path)
2379 .sent_packets
2380 .iter_range(range)
2381 {
2382 newly_acked.insert_one(pn);
2383 }
2384 }
2385
2386 if newly_acked.is_empty() {
2387 return Ok(());
2388 }
2389
2390 let mut ack_eliciting_acked = false;
2391 for packet in newly_acked.elts() {
2392 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2393 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2394 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2400 pns.pending_acks.subtract_below(*acked_pn);
2401 }
2402 }
2403 ack_eliciting_acked |= info.ack_eliciting;
2404
2405 let path_data = self.path_data_mut(path);
2407 let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2408 if mtu_updated {
2409 path_data
2410 .congestion
2411 .on_mtu_update(path_data.mtud.current_mtu());
2412 }
2413
2414 self.ack_frequency.on_acked(path, packet);
2416
2417 self.on_packet_acked(now, path, info);
2418 }
2419 }
2420
2421 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2422 let app_limited = self.app_limited;
2423 let path_data = self.path_data_mut(path);
2424 let in_flight = path_data.in_flight.bytes;
2425
2426 path_data
2427 .congestion
2428 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2429
2430 if new_largest && ack_eliciting_acked {
2431 let ack_delay = if space != SpaceId::Data {
2432 Duration::from_micros(0)
2433 } else {
2434 cmp::min(
2435 self.ack_frequency.peer_max_ack_delay,
2436 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2437 )
2438 };
2439 let rtt = now.saturating_duration_since(
2440 self.spaces[space].for_path(path).largest_acked_packet_sent,
2441 );
2442
2443 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2444 let path_data = self.path_data_mut(path);
2445 path_data.rtt.update(ack_delay, rtt);
2447 if path_data.first_packet_after_rtt_sample.is_none() {
2448 path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2449 }
2450 }
2451
2452 self.detect_lost_packets(now, space, path, true);
2454
2455 if self.peer_completed_address_validation(path) {
2456 self.path_data_mut(path).pto_count = 0;
2457 }
2458
2459 if self.path_data(path).sending_ecn {
2464 if let Some(ecn) = ack.ecn {
2465 if new_largest {
2470 let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2471 self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2472 }
2473 } else {
2474 debug!("ECN not acknowledged by peer");
2476 self.path_data_mut(path).sending_ecn = false;
2477 }
2478 }
2479
2480 self.set_loss_detection_timer(now, path);
2481 Ok(())
2482 }
2483
2484 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2485 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2486
2487 if lost_packets.is_empty() {
2488 return false;
2489 }
2490
2491 for range in ack.iter() {
2492 let spurious_losses: Vec<u64> = lost_packets
2493 .iter_range(range.clone())
2494 .map(|(pn, _info)| pn)
2495 .collect();
2496
2497 for pn in spurious_losses {
2498 lost_packets.remove(pn);
2499 }
2500 }
2501
2502 lost_packets.is_empty()
2507 }
2508
2509 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2514 let two_pto = 2 * self.path_data(path).rtt.pto_base();
2515
2516 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2517 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2518 }
2519
2520 fn process_ecn(
2522 &mut self,
2523 now: Instant,
2524 space: SpaceId,
2525 path: PathId,
2526 newly_acked: u64,
2527 ecn: frame::EcnCounts,
2528 largest_sent_time: Instant,
2529 ) {
2530 match self.spaces[space]
2531 .for_path(path)
2532 .detect_ecn(newly_acked, ecn)
2533 {
2534 Err(e) => {
2535 debug!("halting ECN due to verification failure: {}", e);
2536
2537 self.path_data_mut(path).sending_ecn = false;
2538 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2541 }
2542 Ok(false) => {}
2543 Ok(true) => {
2544 self.path_stats.entry(path).or_default().congestion_events += 1;
2545 self.path_data_mut(path).congestion.on_congestion_event(
2546 now,
2547 largest_sent_time,
2548 false,
2549 true,
2550 0,
2551 );
2552 }
2553 }
2554 }
2555
2556 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2559 self.paths
2560 .get_mut(&path_id)
2561 .expect("known path")
2562 .remove_in_flight(&info);
2563 let app_limited = self.app_limited;
2564 let path = self.path_data_mut(path_id);
2565 if info.ack_eliciting && !path.is_validating_path() {
2566 let rtt = path.rtt;
2569 path.congestion
2570 .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2571 }
2572
2573 if let Some(retransmits) = info.retransmits.get() {
2575 for (id, _) in retransmits.reset_stream.iter() {
2576 self.streams.reset_acked(*id);
2577 }
2578 }
2579
2580 for frame in info.stream_frames {
2581 self.streams.received_ack_of(frame);
2582 }
2583 }
2584
2585 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2586 let start = if self.zero_rtt_crypto.is_some() {
2587 now
2588 } else {
2589 self.prev_crypto
2590 .as_ref()
2591 .expect("no previous keys")
2592 .end_packet
2593 .as_ref()
2594 .expect("update not acknowledged yet")
2595 .1
2596 };
2597
2598 self.timers.set(
2600 Timer::Conn(ConnTimer::KeyDiscard),
2601 start + self.pto_max_path(space, false) * 3,
2602 self.qlog.with_time(now),
2603 );
2604 }
2605
2606 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2619 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2620 self.detect_lost_packets(now, pn_space, path_id, false);
2622 self.set_loss_detection_timer(now, path_id);
2623 return;
2624 }
2625
2626 let (_, space) = match self.pto_time_and_space(now, path_id) {
2627 Some(x) => x,
2628 None => {
2629 error!(%path_id, "PTO expired while unset");
2630 return;
2631 }
2632 };
2633 trace!(
2634 in_flight = self.path_data(path_id).in_flight.bytes,
2635 count = self.path_data(path_id).pto_count,
2636 ?space,
2637 %path_id,
2638 "PTO fired"
2639 );
2640
2641 let count = match self.path_data(path_id).in_flight.ack_eliciting {
2642 0 => {
2645 debug_assert!(!self.peer_completed_address_validation(path_id));
2646 1
2647 }
2648 _ => 2,
2650 };
2651 let pns = self.spaces[space].for_path(path_id);
2652 pns.loss_probes = pns.loss_probes.saturating_add(count);
2653 let path_data = self.path_data_mut(path_id);
2654 path_data.pto_count = path_data.pto_count.saturating_add(1);
2655 self.set_loss_detection_timer(now, path_id);
2656 }
2657
2658 fn detect_lost_packets(
2675 &mut self,
2676 now: Instant,
2677 pn_space: SpaceId,
2678 path_id: PathId,
2679 due_to_ack: bool,
2680 ) {
2681 let mut lost_packets = Vec::<u64>::new();
2682 let mut lost_mtu_probe = None;
2683 let mut in_persistent_congestion = false;
2684 let mut size_of_lost_packets = 0u64;
2685 self.spaces[pn_space].for_path(path_id).loss_time = None;
2686
2687 let path = self.path_data(path_id);
2690 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2691 let loss_delay = path
2692 .rtt
2693 .conservative()
2694 .mul_f32(self.config.time_threshold)
2695 .max(TIMER_GRANULARITY);
2696 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2697
2698 let largest_acked_packet = self.spaces[pn_space]
2699 .for_path(path_id)
2700 .largest_acked_packet
2701 .expect("detect_lost_packets only to be called if path received at least one ACK");
2702 let packet_threshold = self.config.packet_threshold as u64;
2703
2704 let congestion_period = self
2708 .pto(SpaceId::Data, path_id)
2709 .saturating_mul(self.config.persistent_congestion_threshold);
2710 let mut persistent_congestion_start: Option<Instant> = None;
2711 let mut prev_packet = None;
2712 let space = self.spaces[pn_space].for_path(path_id);
2713
2714 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2715 if prev_packet != Some(packet.wrapping_sub(1)) {
2716 persistent_congestion_start = None;
2718 }
2719
2720 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2724 if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2725 if Some(packet) == in_flight_mtu_probe {
2727 lost_mtu_probe = in_flight_mtu_probe;
2730 } else {
2731 lost_packets.push(packet);
2732 size_of_lost_packets += info.size as u64;
2733 if info.ack_eliciting && due_to_ack {
2734 match persistent_congestion_start {
2735 Some(start) if info.time_sent - start > congestion_period => {
2738 in_persistent_congestion = true;
2739 }
2740 None if first_packet_after_rtt_sample
2742 .is_some_and(|x| x < (pn_space, packet)) =>
2743 {
2744 persistent_congestion_start = Some(info.time_sent);
2745 }
2746 _ => {}
2747 }
2748 }
2749 }
2750 } else {
2751 if space.loss_time.is_none() {
2753 space.loss_time = Some(info.time_sent + loss_delay);
2756 }
2757 persistent_congestion_start = None;
2758 }
2759
2760 prev_packet = Some(packet);
2761 }
2762
2763 self.handle_lost_packets(
2764 pn_space,
2765 path_id,
2766 now,
2767 lost_packets,
2768 lost_mtu_probe,
2769 loss_delay,
2770 in_persistent_congestion,
2771 size_of_lost_packets,
2772 );
2773 }
2774
2775 fn discard_path(&mut self, path_id: PathId, now: Instant) {
2777 trace!(%path_id, "dropping path state");
2778 let path = self.path_data(path_id);
2779 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2780
2781 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2783 .for_path(path_id)
2784 .sent_packets
2785 .iter()
2786 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2787 .map(|(pn, info)| {
2788 size_of_lost_packets += info.size as u64;
2789 pn
2790 })
2791 .collect();
2792
2793 if !lost_pns.is_empty() {
2794 trace!(
2795 %path_id,
2796 count = lost_pns.len(),
2797 lost_bytes = size_of_lost_packets,
2798 "packets lost on path abandon"
2799 );
2800 self.handle_lost_packets(
2801 SpaceId::Data,
2802 path_id,
2803 now,
2804 lost_pns,
2805 in_flight_mtu_probe,
2806 Duration::ZERO,
2807 false,
2808 size_of_lost_packets,
2809 );
2810 }
2811 self.paths.remove(&path_id);
2812 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2813
2814 let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2815 self.events.push_back(
2816 PathEvent::Abandoned {
2817 id: path_id,
2818 path_stats,
2819 }
2820 .into(),
2821 );
2822 }
2823
2824 fn handle_lost_packets(
2825 &mut self,
2826 pn_space: SpaceId,
2827 path_id: PathId,
2828 now: Instant,
2829 lost_packets: Vec<u64>,
2830 lost_mtu_probe: Option<u64>,
2831 loss_delay: Duration,
2832 in_persistent_congestion: bool,
2833 size_of_lost_packets: u64,
2834 ) {
2835 debug_assert!(
2836 {
2837 let mut sorted = lost_packets.clone();
2838 sorted.sort();
2839 sorted == lost_packets
2840 },
2841 "lost_packets must be sorted"
2842 );
2843
2844 self.drain_lost_packets(now, pn_space, path_id);
2845
2846 if let Some(largest_lost) = lost_packets.last().cloned() {
2848 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2849 let largest_lost_sent = self.spaces[pn_space]
2850 .for_path(path_id)
2851 .sent_packets
2852 .get(largest_lost)
2853 .unwrap()
2854 .time_sent;
2855 let path_stats = self.path_stats.entry(path_id).or_default();
2856 path_stats.lost_packets += lost_packets.len() as u64;
2857 path_stats.lost_bytes += size_of_lost_packets;
2858 trace!(
2859 %path_id,
2860 count = lost_packets.len(),
2861 lost_bytes = size_of_lost_packets,
2862 "packets lost",
2863 );
2864
2865 for &packet in &lost_packets {
2866 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2867 continue;
2868 };
2869 self.qlog
2870 .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2871 self.paths
2872 .get_mut(&path_id)
2873 .unwrap()
2874 .remove_in_flight(&info);
2875
2876 for frame in info.stream_frames {
2877 self.streams.retransmit(frame);
2878 }
2879 self.spaces[pn_space].pending |= info.retransmits;
2880 self.path_data_mut(path_id)
2881 .mtud
2882 .on_non_probe_lost(packet, info.size);
2883
2884 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2885 packet,
2886 LostPacket {
2887 time_sent: info.time_sent,
2888 },
2889 );
2890 }
2891
2892 let path = self.path_data_mut(path_id);
2893 if path.mtud.black_hole_detected(now) {
2894 path.congestion.on_mtu_update(path.mtud.current_mtu());
2895 if let Some(max_datagram_size) = self.datagrams().max_size() {
2896 self.datagrams.drop_oversized(max_datagram_size);
2897 }
2898 self.path_stats
2899 .entry(path_id)
2900 .or_default()
2901 .black_holes_detected += 1;
2902 }
2903
2904 let lost_ack_eliciting =
2906 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2907
2908 if lost_ack_eliciting {
2909 self.path_stats
2910 .entry(path_id)
2911 .or_default()
2912 .congestion_events += 1;
2913 self.path_data_mut(path_id).congestion.on_congestion_event(
2914 now,
2915 largest_lost_sent,
2916 in_persistent_congestion,
2917 false,
2918 size_of_lost_packets,
2919 );
2920 }
2921 }
2922
2923 if let Some(packet) = lost_mtu_probe {
2925 let info = self.spaces[SpaceId::Data]
2926 .for_path(path_id)
2927 .take(packet)
2928 .unwrap(); self.paths
2931 .get_mut(&path_id)
2932 .unwrap()
2933 .remove_in_flight(&info);
2934 self.path_data_mut(path_id).mtud.on_probe_lost();
2935 self.path_stats
2936 .entry(path_id)
2937 .or_default()
2938 .lost_plpmtud_probes += 1;
2939 }
2940 }
2941
2942 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2948 SpaceId::iter()
2949 .filter_map(|id| {
2950 self.spaces[id]
2951 .number_spaces
2952 .get(&path_id)
2953 .and_then(|pns| pns.loss_time)
2954 .map(|time| (time, id))
2955 })
2956 .min_by_key(|&(time, _)| time)
2957 }
2958
2959 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2961 let path = self.path(path_id)?;
2962 let pto_count = path.pto_count;
2963 let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2964 let mut duration = path.rtt.pto_base() * backoff;
2965
2966 if path_id == PathId::ZERO
2967 && path.in_flight.ack_eliciting == 0
2968 && !self.peer_completed_address_validation(PathId::ZERO)
2969 {
2970 let space = match self.highest_space {
2976 SpaceId::Handshake => SpaceId::Handshake,
2977 _ => SpaceId::Initial,
2978 };
2979
2980 return Some((now + duration, space));
2981 }
2982
2983 let mut result = None;
2984 for space in SpaceId::iter() {
2985 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2986 continue;
2987 };
2988
2989 if !pns.has_in_flight() {
2990 continue;
2991 }
2992 if space == SpaceId::Data {
2993 if self.is_handshaking() {
2995 return result;
2996 }
2997 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2999 }
3000 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3001 continue;
3002 };
3003 let pto = last_ack_eliciting + duration;
3004 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3005 if path.anti_amplification_blocked(1) {
3006 continue;
3008 }
3009 if path.in_flight.ack_eliciting == 0 {
3010 continue;
3012 }
3013 result = Some((pto, space));
3014 }
3015 }
3016 result
3017 }
3018
3019 fn peer_completed_address_validation(&self, path: PathId) -> bool {
3020 if self.side.is_server() || self.state.is_closed() {
3022 return true;
3023 }
3024 self.spaces[SpaceId::Handshake]
3027 .path_space(PathId::ZERO)
3028 .and_then(|pns| pns.largest_acked_packet)
3029 .is_some()
3030 || self.spaces[SpaceId::Data]
3031 .path_space(path)
3032 .and_then(|pns| pns.largest_acked_packet)
3033 .is_some()
3034 || (self.spaces[SpaceId::Data].crypto.is_some()
3035 && self.spaces[SpaceId::Handshake].crypto.is_none())
3036 }
3037
3038 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3046 if self.state.is_closed() {
3047 return;
3051 }
3052
3053 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3054 self.timers.set(
3056 Timer::PerPath(path_id, PathTimer::LossDetection),
3057 loss_time,
3058 self.qlog.with_time(now),
3059 );
3060 return;
3061 }
3062
3063 if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3066 self.timers.set(
3067 Timer::PerPath(path_id, PathTimer::LossDetection),
3068 timeout,
3069 self.qlog.with_time(now),
3070 );
3071 } else {
3072 self.timers.stop(
3073 Timer::PerPath(path_id, PathTimer::LossDetection),
3074 self.qlog.with_time(now),
3075 );
3076 }
3077 }
3078
3079 fn pto_max_path(&self, space: SpaceId, is_closing: bool) -> Duration {
3085 match space {
3086 SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3087 SpaceId::Data => self
3088 .paths
3089 .iter()
3090 .filter_map(|(path_id, state)| {
3091 if is_closing && state.data.total_sent == 0 && state.data.total_recvd == 0 {
3092 None
3094 } else {
3095 let pto = self.pto(space, *path_id);
3096 Some(pto)
3097 }
3098 })
3099 .max()
3100 .expect("there should be one at least path"),
3101 }
3102 }
3103
3104 fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3109 let max_ack_delay = match space {
3110 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3111 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3112 };
3113 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3114 }
3115
3116 fn on_packet_authenticated(
3117 &mut self,
3118 now: Instant,
3119 space_id: SpaceId,
3120 path_id: PathId,
3121 ecn: Option<EcnCodepoint>,
3122 packet: Option<u64>,
3123 spin: bool,
3124 is_1rtt: bool,
3125 ) {
3126 self.total_authed_packets += 1;
3127 if let Some(last_allowed_receive) = self
3128 .paths
3129 .get(&path_id)
3130 .and_then(|path| path.data.last_allowed_receive)
3131 {
3132 if now > last_allowed_receive {
3133 warn!("received data on path which we abandoned more than 3 * PTO ago");
3134 if !self.state.is_closed() {
3136 self.state.move_to_closed(TransportError::NO_ERROR(
3138 "peer failed to respond with PATH_ABANDON in time",
3139 ));
3140 self.close_common();
3141 self.set_close_timer(now);
3142 self.close = true;
3143 }
3144 return;
3145 }
3146 }
3147
3148 self.reset_keep_alive(path_id, now);
3149 self.reset_idle_timeout(now, space_id, path_id);
3150 self.permit_idle_reset = true;
3151 self.receiving_ecn |= ecn.is_some();
3152 if let Some(x) = ecn {
3153 let space = &mut self.spaces[space_id];
3154 space.for_path(path_id).ecn_counters += x;
3155
3156 if x.is_ce() {
3157 space
3158 .for_path(path_id)
3159 .pending_acks
3160 .set_immediate_ack_required();
3161 }
3162 }
3163
3164 let packet = match packet {
3165 Some(x) => x,
3166 None => return,
3167 };
3168 match &self.side {
3169 ConnectionSide::Client { .. } => {
3170 if space_id == SpaceId::Handshake {
3174 if let Some(hs) = self.state.as_handshake_mut() {
3175 hs.allow_server_migration = false;
3176 }
3177 }
3178 }
3179 ConnectionSide::Server { .. } => {
3180 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3181 {
3182 self.discard_space(now, SpaceId::Initial);
3184 }
3185 if self.zero_rtt_crypto.is_some() && is_1rtt {
3186 self.set_key_discard_timer(now, space_id)
3188 }
3189 }
3190 }
3191 let space = self.spaces[space_id].for_path(path_id);
3192 space.pending_acks.insert_one(packet, now);
3193 if packet >= space.rx_packet.unwrap_or_default() {
3194 space.rx_packet = Some(packet);
3195 self.spin = self.side.is_client() ^ spin;
3197 }
3198 }
3199
3200 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3205 if let Some(timeout) = self.idle_timeout {
3207 if self.state.is_closed() {
3208 self.timers
3209 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3210 } else {
3211 let dt = cmp::max(timeout, 3 * self.pto_max_path(space, false));
3212 self.timers.set(
3213 Timer::Conn(ConnTimer::Idle),
3214 now + dt,
3215 self.qlog.with_time(now),
3216 );
3217 }
3218 }
3219
3220 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3222 if self.state.is_closed() {
3223 self.timers.stop(
3224 Timer::PerPath(path_id, PathTimer::PathIdle),
3225 self.qlog.with_time(now),
3226 );
3227 } else {
3228 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3229 self.timers.set(
3230 Timer::PerPath(path_id, PathTimer::PathIdle),
3231 now + dt,
3232 self.qlog.with_time(now),
3233 );
3234 }
3235 }
3236 }
3237
3238 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3240 if !self.state.is_established() {
3241 return;
3242 }
3243
3244 if let Some(interval) = self.config.keep_alive_interval {
3245 self.timers.set(
3246 Timer::Conn(ConnTimer::KeepAlive),
3247 now + interval,
3248 self.qlog.with_time(now),
3249 );
3250 }
3251
3252 if let Some(interval) = self.path_data(path_id).keep_alive {
3253 self.timers.set(
3254 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3255 now + interval,
3256 self.qlog.with_time(now),
3257 );
3258 }
3259 }
3260
3261 fn reset_cid_retirement(&mut self, now: Instant) {
3263 if let Some((_path, t)) = self.next_cid_retirement() {
3264 self.timers.set(
3265 Timer::Conn(ConnTimer::PushNewCid),
3266 t,
3267 self.qlog.with_time(now),
3268 );
3269 }
3270 }
3271
3272 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3274 self.local_cid_state
3275 .iter()
3276 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3277 .min_by_key(|(_path_id, timeout)| *timeout)
3278 }
3279
3280 pub(crate) fn handle_first_packet(
3285 &mut self,
3286 now: Instant,
3287 remote: SocketAddr,
3288 ecn: Option<EcnCodepoint>,
3289 packet_number: u64,
3290 packet: InitialPacket,
3291 remaining: Option<BytesMut>,
3292 ) -> Result<(), ConnectionError> {
3293 let span = trace_span!("first recv");
3294 let _guard = span.enter();
3295 debug_assert!(self.side.is_server());
3296 let len = packet.header_data.len() + packet.payload.len();
3297 let path_id = PathId::ZERO;
3298 self.path_data_mut(path_id).total_recvd = len as u64;
3299
3300 if let Some(hs) = self.state.as_handshake_mut() {
3301 hs.expected_token = packet.header.token.clone();
3302 } else {
3303 unreachable!("first packet must be delivered in Handshake state");
3304 }
3305
3306 self.on_packet_authenticated(
3308 now,
3309 SpaceId::Initial,
3310 path_id,
3311 ecn,
3312 Some(packet_number),
3313 false,
3314 false,
3315 );
3316
3317 let packet: Packet = packet.into();
3318
3319 let mut qlog = QlogRecvPacket::new(len);
3320 qlog.header(&packet.header, Some(packet_number), path_id);
3321
3322 self.process_decrypted_packet(
3323 now,
3324 remote,
3325 path_id,
3326 Some(packet_number),
3327 packet,
3328 &mut qlog,
3329 )?;
3330 self.qlog.emit_packet_received(qlog, now);
3331 if let Some(data) = remaining {
3332 self.handle_coalesced(now, remote, path_id, ecn, data);
3333 }
3334
3335 self.qlog.emit_recovery_metrics(
3336 path_id,
3337 &mut self.paths.get_mut(&path_id).unwrap().data,
3338 now,
3339 );
3340
3341 Ok(())
3342 }
3343
3344 fn init_0rtt(&mut self, now: Instant) {
3345 let (header, packet) = match self.crypto.early_crypto() {
3346 Some(x) => x,
3347 None => return,
3348 };
3349 if self.side.is_client() {
3350 match self.crypto.transport_parameters() {
3351 Ok(params) => {
3352 let params = params
3353 .expect("crypto layer didn't supply transport parameters with ticket");
3354 let params = TransportParameters {
3356 initial_src_cid: None,
3357 original_dst_cid: None,
3358 preferred_address: None,
3359 retry_src_cid: None,
3360 stateless_reset_token: None,
3361 min_ack_delay: None,
3362 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3363 max_ack_delay: TransportParameters::default().max_ack_delay,
3364 initial_max_path_id: None,
3365 ..params
3366 };
3367 self.set_peer_params(params);
3368 self.qlog.emit_peer_transport_params_restored(self, now);
3369 }
3370 Err(e) => {
3371 error!("session ticket has malformed transport parameters: {}", e);
3372 return;
3373 }
3374 }
3375 }
3376 trace!("0-RTT enabled");
3377 self.zero_rtt_enabled = true;
3378 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3379 }
3380
3381 fn read_crypto(
3382 &mut self,
3383 space: SpaceId,
3384 crypto: &frame::Crypto,
3385 payload_len: usize,
3386 ) -> Result<(), TransportError> {
3387 let expected = if !self.state.is_handshake() {
3388 SpaceId::Data
3389 } else if self.highest_space == SpaceId::Initial {
3390 SpaceId::Initial
3391 } else {
3392 SpaceId::Handshake
3395 };
3396 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3400
3401 let end = crypto.offset + crypto.data.len() as u64;
3402 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3403 warn!(
3404 "received new {:?} CRYPTO data when expecting {:?}",
3405 space, expected
3406 );
3407 return Err(TransportError::PROTOCOL_VIOLATION(
3408 "new data at unexpected encryption level",
3409 ));
3410 }
3411
3412 let space = &mut self.spaces[space];
3413 let max = end.saturating_sub(space.crypto_stream.bytes_read());
3414 if max > self.config.crypto_buffer_size as u64 {
3415 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3416 }
3417
3418 space
3419 .crypto_stream
3420 .insert(crypto.offset, crypto.data.clone(), payload_len);
3421 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3422 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3423 if self.crypto.read_handshake(&chunk.bytes)? {
3424 self.events.push_back(Event::HandshakeDataReady);
3425 }
3426 }
3427
3428 Ok(())
3429 }
3430
3431 fn write_crypto(&mut self) {
3432 loop {
3433 let space = self.highest_space;
3434 let mut outgoing = Vec::new();
3435 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3436 match space {
3437 SpaceId::Initial => {
3438 self.upgrade_crypto(SpaceId::Handshake, crypto);
3439 }
3440 SpaceId::Handshake => {
3441 self.upgrade_crypto(SpaceId::Data, crypto);
3442 }
3443 _ => unreachable!("got updated secrets during 1-RTT"),
3444 }
3445 }
3446 if outgoing.is_empty() {
3447 if space == self.highest_space {
3448 break;
3449 } else {
3450 continue;
3452 }
3453 }
3454 let offset = self.spaces[space].crypto_offset;
3455 let outgoing = Bytes::from(outgoing);
3456 if let Some(hs) = self.state.as_handshake_mut() {
3457 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3458 hs.client_hello = Some(outgoing.clone());
3459 }
3460 }
3461 self.spaces[space].crypto_offset += outgoing.len() as u64;
3462 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3463 self.spaces[space].pending.crypto.push_back(frame::Crypto {
3464 offset,
3465 data: outgoing,
3466 });
3467 }
3468 }
3469
3470 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3472 debug_assert!(
3473 self.spaces[space].crypto.is_none(),
3474 "already reached packet space {space:?}"
3475 );
3476 trace!("{:?} keys ready", space);
3477 if space == SpaceId::Data {
3478 self.next_crypto = Some(
3480 self.crypto
3481 .next_1rtt_keys()
3482 .expect("handshake should be complete"),
3483 );
3484 }
3485
3486 self.spaces[space].crypto = Some(crypto);
3487 debug_assert!(space as usize > self.highest_space as usize);
3488 self.highest_space = space;
3489 if space == SpaceId::Data && self.side.is_client() {
3490 self.zero_rtt_crypto = None;
3492 }
3493 }
3494
3495 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3496 debug_assert!(space_id != SpaceId::Data);
3497 trace!("discarding {:?} keys", space_id);
3498 if space_id == SpaceId::Initial {
3499 if let ConnectionSide::Client { token, .. } = &mut self.side {
3501 *token = Bytes::new();
3502 }
3503 }
3504 let space = &mut self.spaces[space_id];
3505 space.crypto = None;
3506 let pns = space.for_path(PathId::ZERO);
3507 pns.time_of_last_ack_eliciting_packet = None;
3508 pns.loss_time = None;
3509 pns.loss_probes = 0;
3510 let sent_packets = mem::take(&mut pns.sent_packets);
3511 let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3512 for (_, packet) in sent_packets.into_iter() {
3513 path.data.remove_in_flight(&packet);
3514 }
3515
3516 self.set_loss_detection_timer(now, PathId::ZERO)
3517 }
3518
3519 fn handle_coalesced(
3520 &mut self,
3521 now: Instant,
3522 remote: SocketAddr,
3523 path_id: PathId,
3524 ecn: Option<EcnCodepoint>,
3525 data: BytesMut,
3526 ) {
3527 self.path_data_mut(path_id)
3528 .inc_total_recvd(data.len() as u64);
3529 let mut remaining = Some(data);
3530 let cid_len = self
3531 .local_cid_state
3532 .values()
3533 .map(|cid_state| cid_state.cid_len())
3534 .next()
3535 .expect("one cid_state must exist");
3536 while let Some(data) = remaining {
3537 match PartialDecode::new(
3538 data,
3539 &FixedLengthConnectionIdParser::new(cid_len),
3540 &[self.version],
3541 self.endpoint_config.grease_quic_bit,
3542 ) {
3543 Ok((partial_decode, rest)) => {
3544 remaining = rest;
3545 self.handle_decode(now, remote, path_id, ecn, partial_decode);
3546 }
3547 Err(e) => {
3548 trace!("malformed header: {}", e);
3549 return;
3550 }
3551 }
3552 }
3553 }
3554
3555 fn handle_decode(
3556 &mut self,
3557 now: Instant,
3558 remote: SocketAddr,
3559 path_id: PathId,
3560 ecn: Option<EcnCodepoint>,
3561 partial_decode: PartialDecode,
3562 ) {
3563 let qlog = QlogRecvPacket::new(partial_decode.len());
3564 if let Some(decoded) = packet_crypto::unprotect_header(
3565 partial_decode,
3566 &self.spaces,
3567 self.zero_rtt_crypto.as_ref(),
3568 self.peer_params.stateless_reset_token,
3569 ) {
3570 self.handle_packet(
3571 now,
3572 remote,
3573 path_id,
3574 ecn,
3575 decoded.packet,
3576 decoded.stateless_reset,
3577 qlog,
3578 );
3579 }
3580 }
3581
3582 fn handle_packet(
3583 &mut self,
3584 now: Instant,
3585 remote: SocketAddr,
3586 path_id: PathId,
3587 ecn: Option<EcnCodepoint>,
3588 packet: Option<Packet>,
3589 stateless_reset: bool,
3590 mut qlog: QlogRecvPacket,
3591 ) {
3592 self.stats.udp_rx.ios += 1;
3593 if let Some(ref packet) = packet {
3594 trace!(
3595 "got {:?} packet ({} bytes) from {} using id {}",
3596 packet.header.space(),
3597 packet.payload.len() + packet.header_data.len(),
3598 remote,
3599 packet.header.dst_cid(),
3600 );
3601 }
3602
3603 if self.is_handshaking() {
3604 if path_id != PathId::ZERO {
3605 debug!(%remote, %path_id, "discarding multipath packet during handshake");
3606 return;
3607 }
3608 if remote != self.path_data_mut(path_id).remote {
3609 if let Some(hs) = self.state.as_handshake() {
3610 if hs.allow_server_migration {
3611 trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote");
3612 self.path_data_mut(path_id).remote = remote;
3613 self.qlog.emit_tuple_assigned(path_id, remote, now);
3614 } else {
3615 debug!("discarding packet with unexpected remote during handshake");
3616 return;
3617 }
3618 } else {
3619 debug!("discarding packet with unexpected remote during handshake");
3620 return;
3621 }
3622 }
3623 }
3624
3625 let was_closed = self.state.is_closed();
3626 let was_drained = self.state.is_drained();
3627
3628 let decrypted = match packet {
3629 None => Err(None),
3630 Some(mut packet) => self
3631 .decrypt_packet(now, path_id, &mut packet)
3632 .map(move |number| (packet, number)),
3633 };
3634 let result = match decrypted {
3635 _ if stateless_reset => {
3636 debug!("got stateless reset");
3637 Err(ConnectionError::Reset)
3638 }
3639 Err(Some(e)) => {
3640 warn!("illegal packet: {}", e);
3641 Err(e.into())
3642 }
3643 Err(None) => {
3644 debug!("failed to authenticate packet");
3645 self.authentication_failures += 1;
3646 let integrity_limit = self.spaces[self.highest_space]
3647 .crypto
3648 .as_ref()
3649 .unwrap()
3650 .packet
3651 .local
3652 .integrity_limit();
3653 if self.authentication_failures > integrity_limit {
3654 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3655 } else {
3656 return;
3657 }
3658 }
3659 Ok((packet, number)) => {
3660 qlog.header(&packet.header, number, path_id);
3661 let span = match number {
3662 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3663 None => trace_span!("recv", space = ?packet.header.space()),
3664 };
3665 let _guard = span.enter();
3666
3667 let dedup = self.spaces[packet.header.space()]
3668 .path_space_mut(path_id)
3669 .map(|pns| &mut pns.dedup);
3670 if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3671 debug!("discarding possible duplicate packet");
3672 self.qlog.emit_packet_received(qlog, now);
3673 return;
3674 } else if self.state.is_handshake() && packet.header.is_short() {
3675 trace!("dropping short packet during handshake");
3677 self.qlog.emit_packet_received(qlog, now);
3678 return;
3679 } else {
3680 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3681 if let Some(hs) = self.state.as_handshake() {
3682 if self.side.is_server() && token != &hs.expected_token {
3683 warn!("discarding Initial with invalid retry token");
3687 self.qlog.emit_packet_received(qlog, now);
3688 return;
3689 }
3690 }
3691 }
3692
3693 if !self.state.is_closed() {
3694 let spin = match packet.header {
3695 Header::Short { spin, .. } => spin,
3696 _ => false,
3697 };
3698
3699 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3700 self.ensure_path(path_id, remote, now, number);
3702 }
3703 if self.paths.contains_key(&path_id) {
3704 self.on_packet_authenticated(
3705 now,
3706 packet.header.space(),
3707 path_id,
3708 ecn,
3709 number,
3710 spin,
3711 packet.header.is_1rtt(),
3712 );
3713 }
3714 }
3715
3716 let res = self
3717 .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog);
3718
3719 self.qlog.emit_packet_received(qlog, now);
3720 res
3721 }
3722 }
3723 };
3724
3725 if let Err(conn_err) = result {
3727 match conn_err {
3728 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3729 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3730 ConnectionError::Reset
3731 | ConnectionError::TransportError(TransportError {
3732 code: TransportErrorCode::AEAD_LIMIT_REACHED,
3733 ..
3734 }) => {
3735 self.state.move_to_drained(Some(conn_err));
3736 }
3737 ConnectionError::TimedOut => {
3738 unreachable!("timeouts aren't generated by packet processing");
3739 }
3740 ConnectionError::TransportError(err) => {
3741 debug!("closing connection due to transport error: {}", err);
3742 self.state.move_to_closed(err);
3743 }
3744 ConnectionError::VersionMismatch => {
3745 self.state.move_to_draining(Some(conn_err));
3746 }
3747 ConnectionError::LocallyClosed => {
3748 unreachable!("LocallyClosed isn't generated by packet processing");
3749 }
3750 ConnectionError::CidsExhausted => {
3751 unreachable!("CidsExhausted isn't generated by packet processing");
3752 }
3753 };
3754 }
3755
3756 if !was_closed && self.state.is_closed() {
3757 self.close_common();
3758 if !self.state.is_drained() {
3759 self.set_close_timer(now);
3760 }
3761 }
3762 if !was_drained && self.state.is_drained() {
3763 self.endpoint_events.push_back(EndpointEventInner::Drained);
3764 self.timers
3767 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3768 }
3769
3770 if matches!(self.state.as_type(), StateType::Closed) {
3772 let path_remote = self
3776 .paths
3777 .get(&path_id)
3778 .map(|p| p.data.remote)
3779 .unwrap_or(remote);
3780 self.close = remote == path_remote;
3781 }
3782 }
3783
3784 fn process_decrypted_packet(
3785 &mut self,
3786 now: Instant,
3787 remote: SocketAddr,
3788 path_id: PathId,
3789 number: Option<u64>,
3790 packet: Packet,
3791 qlog: &mut QlogRecvPacket,
3792 ) -> Result<(), ConnectionError> {
3793 if !self.paths.contains_key(&path_id) {
3794 trace!(%path_id, ?number, "discarding packet for unknown path");
3798 return Ok(());
3799 }
3800 let state = match self.state.as_type() {
3801 StateType::Established => {
3802 match packet.header.space() {
3803 SpaceId::Data => {
3804 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?
3805 }
3806 _ if packet.header.has_frames() => {
3807 self.process_early_payload(now, path_id, packet, qlog)?
3808 }
3809 _ => {
3810 trace!("discarding unexpected pre-handshake packet");
3811 }
3812 }
3813 return Ok(());
3814 }
3815 StateType::Closed => {
3816 for result in frame::Iter::new(packet.payload.freeze())? {
3817 let frame = match result {
3818 Ok(frame) => frame,
3819 Err(err) => {
3820 debug!("frame decoding error: {err:?}");
3821 continue;
3822 }
3823 };
3824 qlog.frame(&frame);
3825
3826 if let Frame::Padding = frame {
3827 continue;
3828 };
3829
3830 self.stats.frame_rx.record(&frame);
3831
3832 if let Frame::Close(_error) = frame {
3833 self.state.move_to_draining(None);
3834 break;
3835 }
3836 }
3837 return Ok(());
3838 }
3839 StateType::Draining | StateType::Drained => return Ok(()),
3840 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3841 };
3842
3843 match packet.header {
3844 Header::Retry {
3845 src_cid: rem_cid, ..
3846 } => {
3847 debug_assert_eq!(path_id, PathId::ZERO);
3848 if self.side.is_server() {
3849 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3850 }
3851
3852 let is_valid_retry = self
3853 .rem_cids
3854 .get(&path_id)
3855 .map(|cids| cids.active())
3856 .map(|orig_dst_cid| {
3857 self.crypto.is_valid_retry(
3858 orig_dst_cid,
3859 &packet.header_data,
3860 &packet.payload,
3861 )
3862 })
3863 .unwrap_or_default();
3864 if self.total_authed_packets > 1
3865 || packet.payload.len() <= 16 || !is_valid_retry
3867 {
3868 trace!("discarding invalid Retry");
3869 return Ok(());
3877 }
3878
3879 trace!("retrying with CID {}", rem_cid);
3880 let client_hello = state.client_hello.take().unwrap();
3881 self.retry_src_cid = Some(rem_cid);
3882 self.rem_cids
3883 .get_mut(&path_id)
3884 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3885 .update_initial_cid(rem_cid);
3886 self.rem_handshake_cid = rem_cid;
3887
3888 let space = &mut self.spaces[SpaceId::Initial];
3889 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3890 self.on_packet_acked(now, PathId::ZERO, info);
3891 };
3892
3893 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = {
3896 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3897 space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3898 space.crypto_offset = client_hello.len() as u64;
3899 space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3900 .for_path(path_id)
3901 .next_packet_number;
3902 space.pending.crypto.push_back(frame::Crypto {
3903 offset: 0,
3904 data: client_hello,
3905 });
3906 space
3907 };
3908
3909 let zero_rtt = mem::take(
3911 &mut self.spaces[SpaceId::Data]
3912 .for_path(PathId::ZERO)
3913 .sent_packets,
3914 );
3915 for (_, info) in zero_rtt.into_iter() {
3916 self.paths
3917 .get_mut(&PathId::ZERO)
3918 .unwrap()
3919 .remove_in_flight(&info);
3920 self.spaces[SpaceId::Data].pending |= info.retransmits;
3921 }
3922 self.streams.retransmit_all_for_0rtt();
3923
3924 let token_len = packet.payload.len() - 16;
3925 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3926 unreachable!("we already short-circuited if we're server");
3927 };
3928 *token = packet.payload.freeze().split_to(token_len);
3929
3930 self.state = State::handshake(state::Handshake {
3931 expected_token: Bytes::new(),
3932 rem_cid_set: false,
3933 client_hello: None,
3934 allow_server_migration: true,
3935 });
3936 Ok(())
3937 }
3938 Header::Long {
3939 ty: LongType::Handshake,
3940 src_cid: rem_cid,
3941 dst_cid: loc_cid,
3942 ..
3943 } => {
3944 debug_assert_eq!(path_id, PathId::ZERO);
3945 if rem_cid != self.rem_handshake_cid {
3946 debug!(
3947 "discarding packet with mismatched remote CID: {} != {}",
3948 self.rem_handshake_cid, rem_cid
3949 );
3950 return Ok(());
3951 }
3952 self.on_path_validated(path_id);
3953
3954 self.process_early_payload(now, path_id, packet, qlog)?;
3955 if self.state.is_closed() {
3956 return Ok(());
3957 }
3958
3959 if self.crypto.is_handshaking() {
3960 trace!("handshake ongoing");
3961 return Ok(());
3962 }
3963
3964 if self.side.is_client() {
3965 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3967 TransportError::new(
3968 TransportErrorCode::crypto(0x6d),
3969 "transport parameters missing".to_owned(),
3970 )
3971 })?;
3972
3973 if self.has_0rtt() {
3974 if !self.crypto.early_data_accepted().unwrap() {
3975 debug_assert!(self.side.is_client());
3976 debug!("0-RTT rejected");
3977 self.accepted_0rtt = false;
3978 self.streams.zero_rtt_rejected();
3979
3980 self.spaces[SpaceId::Data].pending = Retransmits::default();
3982
3983 let sent_packets = mem::take(
3985 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3986 );
3987 for (_, packet) in sent_packets.into_iter() {
3988 self.paths
3989 .get_mut(&path_id)
3990 .unwrap()
3991 .remove_in_flight(&packet);
3992 }
3993 } else {
3994 self.accepted_0rtt = true;
3995 params.validate_resumption_from(&self.peer_params)?;
3996 }
3997 }
3998 if let Some(token) = params.stateless_reset_token {
3999 let remote = self.path_data(path_id).remote;
4000 self.endpoint_events
4001 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4002 }
4003 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4004 self.issue_first_cids(now);
4005 } else {
4006 self.spaces[SpaceId::Data].pending.handshake_done = true;
4008 self.discard_space(now, SpaceId::Handshake);
4009 self.events.push_back(Event::HandshakeConfirmed);
4010 trace!("handshake confirmed");
4011 }
4012
4013 self.events.push_back(Event::Connected);
4014 self.state.move_to_established();
4015 trace!("established");
4016
4017 self.issue_first_path_cids(now);
4020 Ok(())
4021 }
4022 Header::Initial(InitialHeader {
4023 src_cid: rem_cid,
4024 dst_cid: loc_cid,
4025 ..
4026 }) => {
4027 debug_assert_eq!(path_id, PathId::ZERO);
4028 if !state.rem_cid_set {
4029 trace!("switching remote CID to {}", rem_cid);
4030 let mut state = state.clone();
4031 self.rem_cids
4032 .get_mut(&path_id)
4033 .expect("PathId::ZERO not yet abandoned")
4034 .update_initial_cid(rem_cid);
4035 self.rem_handshake_cid = rem_cid;
4036 self.orig_rem_cid = rem_cid;
4037 state.rem_cid_set = true;
4038 self.state.move_to_handshake(state);
4039 } else if rem_cid != self.rem_handshake_cid {
4040 debug!(
4041 "discarding packet with mismatched remote CID: {} != {}",
4042 self.rem_handshake_cid, rem_cid
4043 );
4044 return Ok(());
4045 }
4046
4047 let starting_space = self.highest_space;
4048 self.process_early_payload(now, path_id, packet, qlog)?;
4049
4050 if self.side.is_server()
4051 && starting_space == SpaceId::Initial
4052 && self.highest_space != SpaceId::Initial
4053 {
4054 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4055 TransportError::new(
4056 TransportErrorCode::crypto(0x6d),
4057 "transport parameters missing".to_owned(),
4058 )
4059 })?;
4060 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4061 self.issue_first_cids(now);
4062 self.init_0rtt(now);
4063 }
4064 Ok(())
4065 }
4066 Header::Long {
4067 ty: LongType::ZeroRtt,
4068 ..
4069 } => {
4070 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?;
4071 Ok(())
4072 }
4073 Header::VersionNegotiate { .. } => {
4074 if self.total_authed_packets > 1 {
4075 return Ok(());
4076 }
4077 let supported = packet
4078 .payload
4079 .chunks(4)
4080 .any(|x| match <[u8; 4]>::try_from(x) {
4081 Ok(version) => self.version == u32::from_be_bytes(version),
4082 Err(_) => false,
4083 });
4084 if supported {
4085 return Ok(());
4086 }
4087 debug!("remote doesn't support our version");
4088 Err(ConnectionError::VersionMismatch)
4089 }
4090 Header::Short { .. } => unreachable!(
4091 "short packets received during handshake are discarded in handle_packet"
4092 ),
4093 }
4094 }
4095
4096 fn process_early_payload(
4098 &mut self,
4099 now: Instant,
4100 path_id: PathId,
4101 packet: Packet,
4102 #[allow(unused)] qlog: &mut QlogRecvPacket,
4103 ) -> Result<(), TransportError> {
4104 debug_assert_ne!(packet.header.space(), SpaceId::Data);
4105 debug_assert_eq!(path_id, PathId::ZERO);
4106 let payload_len = packet.payload.len();
4107 let mut ack_eliciting = false;
4108 for result in frame::Iter::new(packet.payload.freeze())? {
4109 let frame = result?;
4110 qlog.frame(&frame);
4111 let span = match frame {
4112 Frame::Padding => continue,
4113 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4114 };
4115
4116 self.stats.frame_rx.record(&frame);
4117
4118 let _guard = span.as_ref().map(|x| x.enter());
4119 ack_eliciting |= frame.is_ack_eliciting();
4120
4121 if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4123 return Err(TransportError::PROTOCOL_VIOLATION(
4124 "illegal frame type in handshake",
4125 ));
4126 }
4127
4128 match frame {
4129 Frame::Padding | Frame::Ping => {}
4130 Frame::Crypto(frame) => {
4131 self.read_crypto(packet.header.space(), &frame, payload_len)?;
4132 }
4133 Frame::Ack(ack) => {
4134 self.on_ack_received(now, packet.header.space(), ack)?;
4135 }
4136 Frame::PathAck(ack) => {
4137 span.as_ref()
4138 .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4139 self.on_path_ack_received(now, packet.header.space(), ack)?;
4140 }
4141 Frame::Close(reason) => {
4142 self.state.move_to_draining(Some(reason.into()));
4143 return Ok(());
4144 }
4145 _ => {
4146 let mut err =
4147 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4148 err.frame = Some(frame.ty());
4149 return Err(err);
4150 }
4151 }
4152 }
4153
4154 if ack_eliciting {
4155 self.spaces[packet.header.space()]
4157 .for_path(path_id)
4158 .pending_acks
4159 .set_immediate_ack_required();
4160 }
4161
4162 self.write_crypto();
4163 Ok(())
4164 }
4165
4166 fn process_payload(
4168 &mut self,
4169 now: Instant,
4170 remote: SocketAddr,
4171 path_id: PathId,
4172 number: u64,
4173 packet: Packet,
4174 #[allow(unused)] qlog: &mut QlogRecvPacket,
4175 ) -> Result<(), TransportError> {
4176 let payload = packet.payload.freeze();
4177 let mut is_probing_packet = true;
4178 let mut close = None;
4179 let payload_len = payload.len();
4180 let mut ack_eliciting = false;
4181 let mut migration_observed_addr = None;
4184 for result in frame::Iter::new(payload)? {
4185 let frame = result?;
4186 qlog.frame(&frame);
4187 let span = match frame {
4188 Frame::Padding => continue,
4189 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4190 };
4191
4192 self.stats.frame_rx.record(&frame);
4193 match &frame {
4196 Frame::Crypto(f) => {
4197 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4198 }
4199 Frame::Stream(f) => {
4200 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4201 }
4202 Frame::Datagram(f) => {
4203 trace!(len = f.data.len(), "got datagram frame");
4204 }
4205 f => {
4206 trace!("got frame {f}");
4207 }
4208 }
4209
4210 let _guard = span.enter();
4211 if packet.header.is_0rtt() {
4212 match frame {
4213 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4214 return Err(TransportError::PROTOCOL_VIOLATION(
4215 "illegal frame type in 0-RTT",
4216 ));
4217 }
4218 _ => {
4219 if frame.is_1rtt() {
4220 return Err(TransportError::PROTOCOL_VIOLATION(
4221 "illegal frame type in 0-RTT",
4222 ));
4223 }
4224 }
4225 }
4226 }
4227 ack_eliciting |= frame.is_ack_eliciting();
4228
4229 match frame {
4231 Frame::Padding
4232 | Frame::PathChallenge(_)
4233 | Frame::PathResponse(_)
4234 | Frame::NewConnectionId(_)
4235 | Frame::ObservedAddr(_) => {}
4236 _ => {
4237 is_probing_packet = false;
4238 }
4239 }
4240
4241 match frame {
4242 Frame::Crypto(frame) => {
4243 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4244 }
4245 Frame::Stream(frame) => {
4246 if self.streams.received(frame, payload_len)?.should_transmit() {
4247 self.spaces[SpaceId::Data].pending.max_data = true;
4248 }
4249 }
4250 Frame::Ack(ack) => {
4251 self.on_ack_received(now, SpaceId::Data, ack)?;
4252 }
4253 Frame::PathAck(ack) => {
4254 span.record("path", tracing::field::debug(&ack.path_id));
4255 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4256 }
4257 Frame::Padding | Frame::Ping => {}
4258 Frame::Close(reason) => {
4259 close = Some(reason);
4260 }
4261 Frame::PathChallenge(challenge) => {
4262 let path = &mut self
4263 .path_mut(path_id)
4264 .expect("payload is processed only after the path becomes known");
4265 path.path_responses.push(number, challenge.0, remote);
4266 if remote == path.remote {
4267 match self.peer_supports_ack_frequency() {
4277 true => self.immediate_ack(path_id),
4278 false => {
4279 self.ping_path(path_id).ok();
4280 }
4281 }
4282 }
4283 }
4284 Frame::PathResponse(response) => {
4285 let path = self
4286 .paths
4287 .get_mut(&path_id)
4288 .expect("payload is processed only after the path becomes known");
4289
4290 use PathTimer::*;
4291 use paths::OnPathResponseReceived::*;
4292 match path.data.on_path_response_received(now, response.0, remote) {
4293 OnPath { was_open } => {
4294 let qlog = self.qlog.with_time(now);
4295
4296 self.timers
4297 .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4298 self.timers
4299 .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4300
4301 let next_challenge = path
4302 .data
4303 .earliest_expiring_challenge()
4304 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4305 self.timers.set_or_stop(
4306 Timer::PerPath(path_id, PathChallengeLost),
4307 next_challenge,
4308 qlog,
4309 );
4310
4311 if !was_open {
4312 self.events
4313 .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4314 if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4315 {
4316 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4317 id: path_id,
4318 addr: observed.socket_addr(),
4319 }));
4320 }
4321 }
4322 if let Some((_, ref mut prev)) = path.prev {
4323 prev.challenges_sent.clear();
4324 prev.send_new_challenge = false;
4325 }
4326 }
4327 OffPath => {
4328 debug!("Response to off-path PathChallenge!");
4329 let next_challenge = path
4330 .data
4331 .earliest_expiring_challenge()
4332 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4333 self.timers.set_or_stop(
4334 Timer::PerPath(path_id, PathChallengeLost),
4335 next_challenge,
4336 self.qlog.with_time(now),
4337 );
4338 }
4339 Invalid { expected } => {
4340 debug!(%response, from=%remote, %expected, "ignoring invalid PATH_RESPONSE")
4341 }
4342 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4343 }
4344 }
4345 Frame::MaxData(bytes) => {
4346 self.streams.received_max_data(bytes);
4347 }
4348 Frame::MaxStreamData { id, offset } => {
4349 self.streams.received_max_stream_data(id, offset)?;
4350 }
4351 Frame::MaxStreams { dir, count } => {
4352 self.streams.received_max_streams(dir, count)?;
4353 }
4354 Frame::ResetStream(frame) => {
4355 if self.streams.received_reset(frame)?.should_transmit() {
4356 self.spaces[SpaceId::Data].pending.max_data = true;
4357 }
4358 }
4359 Frame::DataBlocked { offset } => {
4360 debug!(offset, "peer claims to be blocked at connection level");
4361 }
4362 Frame::StreamDataBlocked { id, offset } => {
4363 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4364 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4365 return Err(TransportError::STREAM_STATE_ERROR(
4366 "STREAM_DATA_BLOCKED on send-only stream",
4367 ));
4368 }
4369 debug!(
4370 stream = %id,
4371 offset, "peer claims to be blocked at stream level"
4372 );
4373 }
4374 Frame::StreamsBlocked { dir, limit } => {
4375 if limit > MAX_STREAM_COUNT {
4376 return Err(TransportError::FRAME_ENCODING_ERROR(
4377 "unrepresentable stream limit",
4378 ));
4379 }
4380 debug!(
4381 "peer claims to be blocked opening more than {} {} streams",
4382 limit, dir
4383 );
4384 }
4385 Frame::StopSending(frame::StopSending { id, error_code }) => {
4386 if id.initiator() != self.side.side() {
4387 if id.dir() == Dir::Uni {
4388 debug!("got STOP_SENDING on recv-only {}", id);
4389 return Err(TransportError::STREAM_STATE_ERROR(
4390 "STOP_SENDING on recv-only stream",
4391 ));
4392 }
4393 } else if self.streams.is_local_unopened(id) {
4394 return Err(TransportError::STREAM_STATE_ERROR(
4395 "STOP_SENDING on unopened stream",
4396 ));
4397 }
4398 self.streams.received_stop_sending(id, error_code);
4399 }
4400 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4401 if let Some(ref path_id) = path_id {
4402 span.record("path", tracing::field::debug(&path_id));
4403 }
4404 let path_id = path_id.unwrap_or_default();
4405 match self.local_cid_state.get_mut(&path_id) {
4406 None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4407 Some(cid_state) => {
4408 let allow_more_cids = cid_state
4409 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4410
4411 let has_path = !self.abandoned_paths.contains(&path_id);
4415 let allow_more_cids = allow_more_cids && has_path;
4416
4417 self.endpoint_events
4418 .push_back(EndpointEventInner::RetireConnectionId(
4419 now,
4420 path_id,
4421 sequence,
4422 allow_more_cids,
4423 ));
4424 }
4425 }
4426 }
4427 Frame::NewConnectionId(frame) => {
4428 let path_id = if let Some(path_id) = frame.path_id {
4429 if !self.is_multipath_negotiated() {
4430 return Err(TransportError::PROTOCOL_VIOLATION(
4431 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4432 ));
4433 }
4434 if path_id > self.local_max_path_id {
4435 return Err(TransportError::PROTOCOL_VIOLATION(
4436 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4437 ));
4438 }
4439 path_id
4440 } else {
4441 PathId::ZERO
4442 };
4443
4444 if self.abandoned_paths.contains(&path_id) {
4445 trace!("ignoring issued CID for abandoned path");
4446 continue;
4447 }
4448 if let Some(ref path_id) = frame.path_id {
4449 span.record("path", tracing::field::debug(&path_id));
4450 }
4451 let rem_cids = self
4452 .rem_cids
4453 .entry(path_id)
4454 .or_insert_with(|| CidQueue::new(frame.id));
4455 if rem_cids.active().is_empty() {
4456 return Err(TransportError::PROTOCOL_VIOLATION(
4457 "NEW_CONNECTION_ID when CIDs aren't in use",
4458 ));
4459 }
4460 if frame.retire_prior_to > frame.sequence {
4461 return Err(TransportError::PROTOCOL_VIOLATION(
4462 "NEW_CONNECTION_ID retiring unissued CIDs",
4463 ));
4464 }
4465
4466 use crate::cid_queue::InsertError;
4467 match rem_cids.insert(frame) {
4468 Ok(None) if self.path(path_id).is_none() => {
4469 self.continue_nat_traversal_round(now);
4472 }
4473 Ok(None) => {}
4474 Ok(Some((retired, reset_token))) => {
4475 let pending_retired =
4476 &mut self.spaces[SpaceId::Data].pending.retire_cids;
4477 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4480 if (pending_retired.len() as u64)
4483 .saturating_add(retired.end.saturating_sub(retired.start))
4484 > MAX_PENDING_RETIRED_CIDS
4485 {
4486 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4487 "queued too many retired CIDs",
4488 ));
4489 }
4490 pending_retired.extend(retired.map(|seq| (path_id, seq)));
4491 self.set_reset_token(path_id, remote, reset_token);
4492 }
4493 Err(InsertError::ExceedsLimit) => {
4494 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4495 }
4496 Err(InsertError::Retired) => {
4497 trace!("discarding already-retired");
4498 self.spaces[SpaceId::Data]
4502 .pending
4503 .retire_cids
4504 .push((path_id, frame.sequence));
4505 continue;
4506 }
4507 };
4508
4509 if self.side.is_server()
4510 && path_id == PathId::ZERO
4511 && self
4512 .rem_cids
4513 .get(&PathId::ZERO)
4514 .map(|cids| cids.active_seq() == 0)
4515 .unwrap_or_default()
4516 {
4517 self.update_rem_cid(PathId::ZERO);
4520 }
4521 }
4522 Frame::NewToken(NewToken { token }) => {
4523 let ConnectionSide::Client {
4524 token_store,
4525 server_name,
4526 ..
4527 } = &self.side
4528 else {
4529 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4530 };
4531 if token.is_empty() {
4532 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4533 }
4534 trace!("got new token");
4535 token_store.insert(server_name, token);
4536 }
4537 Frame::Datagram(datagram) => {
4538 if self
4539 .datagrams
4540 .received(datagram, &self.config.datagram_receive_buffer_size)?
4541 {
4542 self.events.push_back(Event::DatagramReceived);
4543 }
4544 }
4545 Frame::AckFrequency(ack_frequency) => {
4546 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4549 continue;
4552 }
4553
4554 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4556 space.pending_acks.set_ack_frequency_params(&ack_frequency);
4557
4558 if let Some(timeout) = space
4561 .pending_acks
4562 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4563 {
4564 self.timers.set(
4565 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4566 timeout,
4567 self.qlog.with_time(now),
4568 );
4569 }
4570 }
4571 }
4572 Frame::ImmediateAck => {
4573 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4575 pns.pending_acks.set_immediate_ack_required();
4576 }
4577 }
4578 Frame::HandshakeDone => {
4579 if self.side.is_server() {
4580 return Err(TransportError::PROTOCOL_VIOLATION(
4581 "client sent HANDSHAKE_DONE",
4582 ));
4583 }
4584 if self.spaces[SpaceId::Handshake].crypto.is_some() {
4585 self.discard_space(now, SpaceId::Handshake);
4586 }
4587 self.events.push_back(Event::HandshakeConfirmed);
4588 trace!("handshake confirmed");
4589 }
4590 Frame::ObservedAddr(observed) => {
4591 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4593 if !self
4594 .peer_params
4595 .address_discovery_role
4596 .should_report(&self.config.address_discovery_role)
4597 {
4598 return Err(TransportError::PROTOCOL_VIOLATION(
4599 "received OBSERVED_ADDRESS frame when not negotiated",
4600 ));
4601 }
4602 if packet.header.space() != SpaceId::Data {
4604 return Err(TransportError::PROTOCOL_VIOLATION(
4605 "OBSERVED_ADDRESS frame outside data space",
4606 ));
4607 }
4608
4609 let path = self.path_data_mut(path_id);
4610 if remote == path.remote {
4611 if let Some(updated) = path.update_observed_addr_report(observed) {
4612 if path.open {
4613 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4614 id: path_id,
4615 addr: updated,
4616 }));
4617 }
4618 }
4620 } else {
4621 migration_observed_addr = Some(observed)
4623 }
4624 }
4625 Frame::PathAbandon(frame::PathAbandon {
4626 path_id,
4627 error_code,
4628 }) => {
4629 span.record("path", tracing::field::debug(&path_id));
4630 let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4632 Ok(()) => {
4633 trace!("peer abandoned path");
4634 false
4635 }
4636 Err(ClosePathError::LastOpenPath) => {
4637 trace!("peer abandoned last path, closing connection");
4638 return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4640 }
4641 Err(ClosePathError::ClosedPath) => {
4642 trace!("peer abandoned already closed path");
4643 true
4644 }
4645 };
4646 if self.path(path_id).is_some() && !already_abandoned {
4651 let delay = self.pto(SpaceId::Data, path_id) * 3;
4656 self.timers.set(
4657 Timer::PerPath(path_id, PathTimer::DiscardPath),
4658 now + delay,
4659 self.qlog.with_time(now),
4660 );
4661 }
4662 }
4663 Frame::PathStatusAvailable(info) => {
4664 span.record("path", tracing::field::debug(&info.path_id));
4665 if self.is_multipath_negotiated() {
4666 self.on_path_status(
4667 info.path_id,
4668 PathStatus::Available,
4669 info.status_seq_no,
4670 );
4671 } else {
4672 return Err(TransportError::PROTOCOL_VIOLATION(
4673 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4674 ));
4675 }
4676 }
4677 Frame::PathStatusBackup(info) => {
4678 span.record("path", tracing::field::debug(&info.path_id));
4679 if self.is_multipath_negotiated() {
4680 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4681 } else {
4682 return Err(TransportError::PROTOCOL_VIOLATION(
4683 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4684 ));
4685 }
4686 }
4687 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4688 span.record("path", tracing::field::debug(&path_id));
4689 if !self.is_multipath_negotiated() {
4690 return Err(TransportError::PROTOCOL_VIOLATION(
4691 "received MAX_PATH_ID frame when multipath was not negotiated",
4692 ));
4693 }
4694 if path_id > self.remote_max_path_id {
4696 self.remote_max_path_id = path_id;
4697 self.issue_first_path_cids(now);
4698 while let Some(true) = self.continue_nat_traversal_round(now) {}
4699 }
4700 }
4701 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4702 if self.is_multipath_negotiated() {
4706 if self.local_max_path_id > max_path_id {
4707 return Err(TransportError::PROTOCOL_VIOLATION(
4708 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4709 ));
4710 }
4711 debug!("received PATHS_BLOCKED({:?})", max_path_id);
4712 } else {
4714 return Err(TransportError::PROTOCOL_VIOLATION(
4715 "received PATHS_BLOCKED frame when not multipath was not negotiated",
4716 ));
4717 }
4718 }
4719 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4720 if self.is_multipath_negotiated() {
4728 if path_id > self.local_max_path_id {
4729 return Err(TransportError::PROTOCOL_VIOLATION(
4730 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4731 ));
4732 }
4733 if next_seq.0
4734 > self
4735 .local_cid_state
4736 .get(&path_id)
4737 .map(|cid_state| cid_state.active_seq().1 + 1)
4738 .unwrap_or_default()
4739 {
4740 return Err(TransportError::PROTOCOL_VIOLATION(
4741 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4742 ));
4743 }
4744 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4745 } else {
4746 return Err(TransportError::PROTOCOL_VIOLATION(
4747 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4748 ));
4749 }
4750 }
4751 Frame::AddAddress(addr) => {
4752 let client_state = match self.iroh_hp.client_side_mut() {
4753 Ok(state) => state,
4754 Err(err) => {
4755 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4756 "Nat traversal(ADD_ADDRESS): {err}"
4757 )));
4758 }
4759 };
4760
4761 if !client_state.check_remote_address(&addr) {
4762 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4764 }
4765
4766 match client_state.add_remote_address(addr) {
4767 Ok(maybe_added) => {
4768 if let Some(added) = maybe_added {
4769 self.events.push_back(Event::NatTraversal(
4770 iroh_hp::Event::AddressAdded(added),
4771 ));
4772 }
4773 }
4774 Err(e) => {
4775 warn!(%e, "failed to add remote address")
4776 }
4777 }
4778 }
4779 Frame::RemoveAddress(addr) => {
4780 let client_state = match self.iroh_hp.client_side_mut() {
4781 Ok(state) => state,
4782 Err(err) => {
4783 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4784 "Nat traversal(REMOVE_ADDRESS): {err}"
4785 )));
4786 }
4787 };
4788 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4789 self.events
4790 .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4791 removed_addr,
4792 )));
4793 }
4794 }
4795 Frame::ReachOut(reach_out) => {
4796 let server_state = match self.iroh_hp.server_side_mut() {
4797 Ok(state) => state,
4798 Err(err) => {
4799 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4800 "Nat traversal(REACH_OUT): {err}"
4801 )));
4802 }
4803 };
4804
4805 if let Err(err) = server_state.handle_reach_out(reach_out) {
4806 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4807 "Nat traversal(REACH_OUT): {err}"
4808 )));
4809 }
4810 }
4811 }
4812 }
4813
4814 let space = self.spaces[SpaceId::Data].for_path(path_id);
4815 if space
4816 .pending_acks
4817 .packet_received(now, number, ack_eliciting, &space.dedup)
4818 {
4819 if self.abandoned_paths.contains(&path_id) {
4820 space.pending_acks.set_immediate_ack_required();
4823 } else {
4824 self.timers.set(
4825 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4826 now + self.ack_frequency.max_ack_delay,
4827 self.qlog.with_time(now),
4828 );
4829 }
4830 }
4831
4832 let pending = &mut self.spaces[SpaceId::Data].pending;
4837 self.streams.queue_max_stream_id(pending);
4838
4839 if let Some(reason) = close {
4840 self.state.move_to_draining(Some(reason.into()));
4841 self.close = true;
4842 }
4843
4844 if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4845 && !is_probing_packet
4846 && remote != self.path_data(path_id).remote
4847 {
4848 let ConnectionSide::Server { ref server_config } = self.side else {
4849 panic!("packets from unknown remote should be dropped by clients");
4850 };
4851 debug_assert!(
4852 server_config.migration,
4853 "migration-initiating packets should have been dropped immediately"
4854 );
4855 self.migrate(path_id, now, remote, migration_observed_addr);
4856 self.update_rem_cid(path_id);
4858 self.spin = false;
4859 }
4860
4861 Ok(())
4862 }
4863
4864 fn migrate(
4865 &mut self,
4866 path_id: PathId,
4867 now: Instant,
4868 remote: SocketAddr,
4869 observed_addr: Option<ObservedAddr>,
4870 ) {
4871 trace!(%remote, %path_id, "migration initiated");
4872 self.path_counter = self.path_counter.wrapping_add(1);
4873 let prev_pto = self.pto(SpaceId::Data, path_id);
4880 let known_path = self.paths.get_mut(&path_id).expect("known path");
4881 let path = &mut known_path.data;
4882 let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() {
4883 PathData::from_previous(remote, path, self.path_counter, now)
4884 } else {
4885 let peer_max_udp_payload_size =
4886 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4887 .unwrap_or(u16::MAX);
4888 PathData::new(
4889 remote,
4890 self.allow_mtud,
4891 Some(peer_max_udp_payload_size),
4892 self.path_counter,
4893 now,
4894 &self.config,
4895 )
4896 };
4897 new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4898 if let Some(report) = observed_addr {
4899 if let Some(updated) = new_path.update_observed_addr_report(report) {
4900 tracing::info!("adding observed addr event from migration");
4901 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4902 id: path_id,
4903 addr: updated,
4904 }));
4905 }
4906 }
4907 new_path.send_new_challenge = true;
4908
4909 let mut prev = mem::replace(path, new_path);
4910 if !prev.is_validating_path() {
4912 prev.send_new_challenge = true;
4913 known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4917 }
4918
4919 self.qlog.emit_tuple_assigned(path_id, remote, now);
4921
4922 self.timers.set(
4923 Timer::PerPath(path_id, PathTimer::PathValidation),
4924 now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4925 self.qlog.with_time(now),
4926 );
4927 }
4928
4929 pub fn local_address_changed(&mut self) {
4931 self.update_rem_cid(PathId::ZERO);
4933 self.ping();
4934 }
4935
4936 fn update_rem_cid(&mut self, path_id: PathId) {
4938 let Some((reset_token, retired)) =
4939 self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4940 else {
4941 return;
4942 };
4943
4944 self.spaces[SpaceId::Data]
4946 .pending
4947 .retire_cids
4948 .extend(retired.map(|seq| (path_id, seq)));
4949 let remote = self.path_data(path_id).remote;
4950 self.set_reset_token(path_id, remote, reset_token);
4951 }
4952
4953 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4962 self.endpoint_events
4963 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4964
4965 if path_id == PathId::ZERO {
4971 self.peer_params.stateless_reset_token = Some(reset_token);
4972 }
4973 }
4974
4975 fn issue_first_cids(&mut self, now: Instant) {
4977 if self
4978 .local_cid_state
4979 .get(&PathId::ZERO)
4980 .expect("PathId::ZERO exists when the connection is created")
4981 .cid_len()
4982 == 0
4983 {
4984 return;
4985 }
4986
4987 let mut n = self.peer_params.issue_cids_limit() - 1;
4989 if let ConnectionSide::Server { server_config } = &self.side {
4990 if server_config.has_preferred_address() {
4991 n -= 1;
4993 }
4994 }
4995 self.endpoint_events
4996 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
4997 }
4998
4999 fn issue_first_path_cids(&mut self, now: Instant) {
5003 if let Some(max_path_id) = self.max_path_id() {
5004 let mut path_id = self.max_path_id_with_cids.next();
5005 while path_id <= max_path_id {
5006 self.endpoint_events
5007 .push_back(EndpointEventInner::NeedIdentifiers(
5008 path_id,
5009 now,
5010 self.peer_params.issue_cids_limit(),
5011 ));
5012 path_id = path_id.next();
5013 }
5014 self.max_path_id_with_cids = max_path_id;
5015 }
5016 }
5017
5018 fn populate_packet(
5026 &mut self,
5027 now: Instant,
5028 space_id: SpaceId,
5029 path_id: PathId,
5030 path_exclusive_only: bool,
5031 buf: &mut impl BufMut,
5032 pn: u64,
5033 #[allow(unused)] qlog: &mut QlogSentPacket,
5034 ) -> SentFrames {
5035 let mut sent = SentFrames::default();
5036 let is_multipath_negotiated = self.is_multipath_negotiated();
5037 let space = &mut self.spaces[space_id];
5038 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5039 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5040 space
5041 .for_path(path_id)
5042 .pending_acks
5043 .maybe_ack_non_eliciting();
5044
5045 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5047 trace!("HANDSHAKE_DONE");
5048 buf.write(frame::FrameType::HANDSHAKE_DONE);
5049 qlog.frame(&Frame::HandshakeDone);
5050 sent.retransmits.get_or_create().handshake_done = true;
5051 self.stats.frame_tx.handshake_done =
5053 self.stats.frame_tx.handshake_done.saturating_add(1);
5054 }
5055
5056 if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5059 while let Some(local_addr) = addresses.pop() {
5060 let reach_out = frame::ReachOut::new(*round, local_addr);
5061 if buf.remaining_mut() > reach_out.size() {
5062 trace!(%round, ?local_addr, "REACH_OUT");
5063 reach_out.write(buf);
5064 let sent_reachouts = sent
5065 .retransmits
5066 .get_or_create()
5067 .reach_out
5068 .get_or_insert_with(|| (*round, Default::default()));
5069 sent_reachouts.1.push(local_addr);
5070 self.stats.frame_tx.reach_out = self.stats.frame_tx.reach_out.saturating_add(1);
5071 qlog.frame(&Frame::ReachOut(reach_out));
5072 } else {
5073 addresses.push(local_addr);
5074 break;
5075 }
5076 }
5077 if addresses.is_empty() {
5078 space.pending.reach_out = None;
5079 }
5080 }
5081
5082 if !path_exclusive_only
5084 && space_id == SpaceId::Data
5085 && self
5086 .config
5087 .address_discovery_role
5088 .should_report(&self.peer_params.address_discovery_role)
5089 && (!path.observed_addr_sent || space.pending.observed_addr)
5090 {
5091 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5092 if buf.remaining_mut() > frame.size() {
5093 trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5094 frame.write(buf);
5095
5096 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5097 path.observed_addr_sent = true;
5098
5099 self.stats.frame_tx.observed_addr += 1;
5100 sent.retransmits.get_or_create().observed_addr = true;
5101 space.pending.observed_addr = false;
5102 qlog.frame(&Frame::ObservedAddr(frame));
5103 }
5104 }
5105
5106 if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5108 trace!("PING");
5109 buf.write(frame::FrameType::PING);
5110 sent.non_retransmits = true;
5111 self.stats.frame_tx.ping += 1;
5112 qlog.frame(&Frame::Ping);
5113 }
5114
5115 if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5117 debug_assert_eq!(
5118 space_id,
5119 SpaceId::Data,
5120 "immediate acks must be sent in the data space"
5121 );
5122 trace!("IMMEDIATE_ACK");
5123 buf.write(frame::FrameType::IMMEDIATE_ACK);
5124 sent.non_retransmits = true;
5125 self.stats.frame_tx.immediate_ack += 1;
5126 qlog.frame(&Frame::ImmediateAck);
5127 }
5128
5129 if !path_exclusive_only {
5133 for path_id in space
5134 .number_spaces
5135 .iter_mut()
5136 .filter(|(_, pns)| pns.pending_acks.can_send())
5137 .map(|(&path_id, _)| path_id)
5138 .collect::<Vec<_>>()
5139 {
5140 Self::populate_acks(
5141 now,
5142 self.receiving_ecn,
5143 &mut sent,
5144 path_id,
5145 space_id,
5146 space,
5147 is_multipath_negotiated,
5148 buf,
5149 &mut self.stats,
5150 qlog,
5151 );
5152 }
5153 }
5154
5155 if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5157 let sequence_number = self.ack_frequency.next_sequence_number();
5158
5159 let config = self.config.ack_frequency_config.as_ref().unwrap();
5161
5162 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5164 path.rtt.get(),
5165 config,
5166 &self.peer_params,
5167 );
5168
5169 trace!(?max_ack_delay, "ACK_FREQUENCY");
5170
5171 let frame = frame::AckFrequency {
5172 sequence: sequence_number,
5173 ack_eliciting_threshold: config.ack_eliciting_threshold,
5174 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5175 reordering_threshold: config.reordering_threshold,
5176 };
5177 frame.encode(buf);
5178 qlog.frame(&Frame::AckFrequency(frame));
5179
5180 sent.retransmits.get_or_create().ack_frequency = true;
5181
5182 self.ack_frequency
5183 .ack_frequency_sent(path_id, pn, max_ack_delay);
5184 self.stats.frame_tx.ack_frequency += 1;
5185 }
5186
5187 if buf.remaining_mut() > frame::PathChallenge::SIZE_BOUND
5189 && space_id == SpaceId::Data
5190 && path.send_new_challenge
5191 && !self.state.is_closed()
5192 {
5194 path.send_new_challenge = false;
5195
5196 let token = self.rng.random();
5198 let info = paths::SentChallengeInfo {
5199 sent_instant: now,
5200 remote: path.remote,
5201 };
5202 path.challenges_sent.insert(token, info);
5203 sent.non_retransmits = true;
5204 sent.requires_padding = true;
5205 let challenge = frame::PathChallenge(token);
5206 trace!(frame = %challenge);
5207 buf.write(challenge);
5208 qlog.frame(&Frame::PathChallenge(challenge));
5209 self.stats.frame_tx.path_challenge += 1;
5210 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5211 self.timers.set(
5212 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5213 now + pto,
5214 self.qlog.with_time(now),
5215 );
5216
5217 if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5218 space.pending.path_status.insert(path_id);
5220 }
5221
5222 if space_id == SpaceId::Data
5225 && self
5226 .config
5227 .address_discovery_role
5228 .should_report(&self.peer_params.address_discovery_role)
5229 {
5230 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5231 if buf.remaining_mut() > frame.size() {
5232 frame.write(buf);
5233 qlog.frame(&Frame::ObservedAddr(frame));
5234
5235 self.next_observed_addr_seq_no =
5236 self.next_observed_addr_seq_no.saturating_add(1u8);
5237 path.observed_addr_sent = true;
5238
5239 self.stats.frame_tx.observed_addr += 1;
5240 sent.retransmits.get_or_create().observed_addr = true;
5241 space.pending.observed_addr = false;
5242 }
5243 }
5244 }
5245
5246 if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data {
5248 if let Some(token) = path.path_responses.pop_on_path(path.remote) {
5249 sent.non_retransmits = true;
5250 sent.requires_padding = true;
5251 let response = frame::PathResponse(token);
5252 trace!(frame = %response);
5253 buf.write(response);
5254 qlog.frame(&Frame::PathResponse(response));
5255 self.stats.frame_tx.path_response += 1;
5256
5257 if space_id == SpaceId::Data
5261 && self
5262 .config
5263 .address_discovery_role
5264 .should_report(&self.peer_params.address_discovery_role)
5265 {
5266 let frame =
5267 frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5268 if buf.remaining_mut() > frame.size() {
5269 frame.write(buf);
5270 qlog.frame(&Frame::ObservedAddr(frame));
5271
5272 self.next_observed_addr_seq_no =
5273 self.next_observed_addr_seq_no.saturating_add(1u8);
5274 path.observed_addr_sent = true;
5275
5276 self.stats.frame_tx.observed_addr += 1;
5277 sent.retransmits.get_or_create().observed_addr = true;
5278 space.pending.observed_addr = false;
5279 }
5280 }
5281 }
5282 }
5283
5284 while !path_exclusive_only && buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt {
5286 let mut frame = match space.pending.crypto.pop_front() {
5287 Some(x) => x,
5288 None => break,
5289 };
5290
5291 let max_crypto_data_size = buf.remaining_mut()
5296 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5298 - 2; let len = frame
5301 .data
5302 .len()
5303 .min(2usize.pow(14) - 1)
5304 .min(max_crypto_data_size);
5305
5306 let data = frame.data.split_to(len);
5307 let truncated = frame::Crypto {
5308 offset: frame.offset,
5309 data,
5310 };
5311 trace!(
5312 "CRYPTO: off {} len {}",
5313 truncated.offset,
5314 truncated.data.len()
5315 );
5316 truncated.encode(buf);
5317 self.stats.frame_tx.crypto += 1;
5318
5319 #[cfg(feature = "qlog")]
5321 qlog.frame(&Frame::Crypto(truncated.clone()));
5322 sent.retransmits.get_or_create().crypto.push_back(truncated);
5323 if !frame.data.is_empty() {
5324 frame.offset += len as u64;
5325 space.pending.crypto.push_front(frame);
5326 }
5327 }
5328
5329 while !path_exclusive_only
5332 && space_id == SpaceId::Data
5333 && frame::PathAbandon::SIZE_BOUND <= buf.remaining_mut()
5334 {
5335 let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5336 break;
5337 };
5338 let frame = frame::PathAbandon {
5339 path_id,
5340 error_code,
5341 };
5342 frame.encode(buf);
5343 qlog.frame(&Frame::PathAbandon(frame));
5344 self.stats.frame_tx.path_abandon += 1;
5345 trace!(%path_id, "PATH_ABANDON");
5346 sent.retransmits
5347 .get_or_create()
5348 .path_abandon
5349 .entry(path_id)
5350 .or_insert(error_code);
5351 }
5352
5353 while !path_exclusive_only
5355 && space_id == SpaceId::Data
5356 && frame::PathStatusAvailable::SIZE_BOUND <= buf.remaining_mut()
5357 {
5358 let Some(path_id) = space.pending.path_status.pop_first() else {
5359 break;
5360 };
5361 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5362 trace!(%path_id, "discarding queued path status for unknown path");
5363 continue;
5364 };
5365
5366 let seq = path.status.seq();
5367 sent.retransmits.get_or_create().path_status.insert(path_id);
5368 match path.local_status() {
5369 PathStatus::Available => {
5370 let frame = frame::PathStatusAvailable {
5371 path_id,
5372 status_seq_no: seq,
5373 };
5374 frame.encode(buf);
5375 qlog.frame(&Frame::PathStatusAvailable(frame));
5376 self.stats.frame_tx.path_status_available += 1;
5377 trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5378 }
5379 PathStatus::Backup => {
5380 let frame = frame::PathStatusBackup {
5381 path_id,
5382 status_seq_no: seq,
5383 };
5384 frame.encode(buf);
5385 qlog.frame(&Frame::PathStatusBackup(frame));
5386 self.stats.frame_tx.path_status_backup += 1;
5387 trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5388 }
5389 }
5390 }
5391
5392 if space_id == SpaceId::Data
5394 && space.pending.max_path_id
5395 && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut()
5396 {
5397 let frame = frame::MaxPathId(self.local_max_path_id);
5398 frame.encode(buf);
5399 qlog.frame(&Frame::MaxPathId(frame));
5400 space.pending.max_path_id = false;
5401 sent.retransmits.get_or_create().max_path_id = true;
5402 trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5403 self.stats.frame_tx.max_path_id += 1;
5404 }
5405
5406 if space_id == SpaceId::Data
5408 && space.pending.paths_blocked
5409 && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut()
5410 {
5411 let frame = frame::PathsBlocked(self.remote_max_path_id);
5412 frame.encode(buf);
5413 qlog.frame(&Frame::PathsBlocked(frame));
5414 space.pending.paths_blocked = false;
5415 sent.retransmits.get_or_create().paths_blocked = true;
5416 trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED");
5417 self.stats.frame_tx.paths_blocked += 1;
5418 }
5419
5420 while space_id == SpaceId::Data && frame::PathCidsBlocked::SIZE_BOUND <= buf.remaining_mut()
5422 {
5423 let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5424 break;
5425 };
5426 let next_seq = match self.rem_cids.get(&path_id) {
5427 Some(cid_queue) => cid_queue.active_seq() + 1,
5428 None => 0,
5429 };
5430 let frame = frame::PathCidsBlocked {
5431 path_id,
5432 next_seq: VarInt(next_seq),
5433 };
5434 frame.encode(buf);
5435 qlog.frame(&Frame::PathCidsBlocked(frame));
5436 sent.retransmits
5437 .get_or_create()
5438 .path_cids_blocked
5439 .push(path_id);
5440 trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5441 self.stats.frame_tx.path_cids_blocked += 1;
5442 }
5443
5444 if space_id == SpaceId::Data {
5446 self.streams.write_control_frames(
5447 buf,
5448 &mut space.pending,
5449 &mut sent.retransmits,
5450 &mut self.stats.frame_tx,
5451 qlog,
5452 );
5453 }
5454
5455 let cid_len = self
5457 .local_cid_state
5458 .values()
5459 .map(|cid_state| cid_state.cid_len())
5460 .max()
5461 .expect("some local CID state must exist");
5462 let new_cid_size_bound =
5463 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5464 while !path_exclusive_only && buf.remaining_mut() > new_cid_size_bound {
5465 let issued = match space.pending.new_cids.pop() {
5466 Some(x) => x,
5467 None => break,
5468 };
5469 let retire_prior_to = self
5470 .local_cid_state
5471 .get(&issued.path_id)
5472 .map(|cid_state| cid_state.retire_prior_to())
5473 .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5474
5475 let cid_path_id = match is_multipath_negotiated {
5476 true => {
5477 trace!(
5478 path_id = ?issued.path_id,
5479 sequence = issued.sequence,
5480 id = %issued.id,
5481 "PATH_NEW_CONNECTION_ID",
5482 );
5483 self.stats.frame_tx.path_new_connection_id += 1;
5484 Some(issued.path_id)
5485 }
5486 false => {
5487 trace!(
5488 sequence = issued.sequence,
5489 id = %issued.id,
5490 "NEW_CONNECTION_ID"
5491 );
5492 debug_assert_eq!(issued.path_id, PathId::ZERO);
5493 self.stats.frame_tx.new_connection_id += 1;
5494 None
5495 }
5496 };
5497 let frame = frame::NewConnectionId {
5498 path_id: cid_path_id,
5499 sequence: issued.sequence,
5500 retire_prior_to,
5501 id: issued.id,
5502 reset_token: issued.reset_token,
5503 };
5504 frame.encode(buf);
5505 sent.retransmits.get_or_create().new_cids.push(issued);
5506 qlog.frame(&Frame::NewConnectionId(frame));
5507 }
5508
5509 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5511 while !path_exclusive_only && buf.remaining_mut() > retire_cid_bound {
5512 let (path_id, sequence) = match space.pending.retire_cids.pop() {
5513 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5514 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5515 self.stats.frame_tx.retire_connection_id += 1;
5516 (None, seq)
5517 }
5518 Some((path_id, seq)) => {
5519 trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5520 self.stats.frame_tx.path_retire_connection_id += 1;
5521 (Some(path_id), seq)
5522 }
5523 None => break,
5524 };
5525 let frame = frame::RetireConnectionId { path_id, sequence };
5526 frame.encode(buf);
5527 qlog.frame(&Frame::RetireConnectionId(frame));
5528 sent.retransmits
5529 .get_or_create()
5530 .retire_cids
5531 .push((path_id.unwrap_or_default(), sequence));
5532 }
5533
5534 let mut sent_datagrams = false;
5536 while !path_exclusive_only
5537 && buf.remaining_mut() > Datagram::SIZE_BOUND
5538 && space_id == SpaceId::Data
5539 {
5540 let prev_remaining = buf.remaining_mut();
5541 match self.datagrams.write(buf) {
5542 true => {
5543 sent_datagrams = true;
5544 sent.non_retransmits = true;
5545 self.stats.frame_tx.datagram += 1;
5546 qlog.frame_datagram((prev_remaining - buf.remaining_mut()) as u64);
5547 }
5548 false => break,
5549 }
5550 }
5551 if self.datagrams.send_blocked && sent_datagrams {
5552 self.events.push_back(Event::DatagramsUnblocked);
5553 self.datagrams.send_blocked = false;
5554 }
5555
5556 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5557
5558 while let Some(remote_addr) = space.pending.new_tokens.pop() {
5560 if path_exclusive_only {
5561 break;
5562 }
5563 debug_assert_eq!(space_id, SpaceId::Data);
5564 let ConnectionSide::Server { server_config } = &self.side else {
5565 panic!("NEW_TOKEN frames should not be enqueued by clients");
5566 };
5567
5568 if remote_addr != path.remote {
5569 continue;
5574 }
5575
5576 let token = Token::new(
5577 TokenPayload::Validation {
5578 ip: remote_addr.ip(),
5579 issued: server_config.time_source.now(),
5580 },
5581 &mut self.rng,
5582 );
5583 let new_token = NewToken {
5584 token: token.encode(&*server_config.token_key).into(),
5585 };
5586
5587 if buf.remaining_mut() < new_token.size() {
5588 space.pending.new_tokens.push(remote_addr);
5589 break;
5590 }
5591
5592 trace!("NEW_TOKEN");
5593 new_token.encode(buf);
5594 qlog.frame(&Frame::NewToken(new_token));
5595 sent.retransmits
5596 .get_or_create()
5597 .new_tokens
5598 .push(remote_addr);
5599 self.stats.frame_tx.new_token += 1;
5600 }
5601
5602 if !path_exclusive_only && space_id == SpaceId::Data {
5604 sent.stream_frames =
5605 self.streams
5606 .write_stream_frames(buf, self.config.send_fairness, qlog);
5607 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
5608 }
5609
5610 while space_id == SpaceId::Data && frame::AddAddress::SIZE_BOUND <= buf.remaining_mut() {
5613 if let Some(added_address) = space.pending.add_address.pop_last() {
5614 trace!(
5615 seq = %added_address.seq_no,
5616 ip = ?added_address.ip,
5617 port = added_address.port,
5618 "ADD_ADDRESS",
5619 );
5620 added_address.write(buf);
5621 sent.retransmits
5622 .get_or_create()
5623 .add_address
5624 .insert(added_address);
5625 self.stats.frame_tx.add_address = self.stats.frame_tx.add_address.saturating_add(1);
5626 qlog.frame(&Frame::AddAddress(added_address));
5627 } else {
5628 break;
5629 }
5630 }
5631
5632 while space_id == SpaceId::Data && frame::RemoveAddress::SIZE_BOUND <= buf.remaining_mut() {
5634 if let Some(removed_address) = space.pending.remove_address.pop_last() {
5635 trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5636 removed_address.write(buf);
5637 sent.retransmits
5638 .get_or_create()
5639 .remove_address
5640 .insert(removed_address);
5641 self.stats.frame_tx.remove_address =
5642 self.stats.frame_tx.remove_address.saturating_add(1);
5643 qlog.frame(&Frame::RemoveAddress(removed_address));
5644 } else {
5645 break;
5646 }
5647 }
5648
5649 sent
5650 }
5651
5652 fn populate_acks(
5654 now: Instant,
5655 receiving_ecn: bool,
5656 sent: &mut SentFrames,
5657 path_id: PathId,
5658 space_id: SpaceId,
5659 space: &mut PacketSpace,
5660 is_multipath_negotiated: bool,
5661 buf: &mut impl BufMut,
5662 stats: &mut ConnectionStats,
5663 #[allow(unused)] qlog: &mut QlogSentPacket,
5664 ) {
5665 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5667
5668 debug_assert!(
5669 is_multipath_negotiated || path_id == PathId::ZERO,
5670 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5671 );
5672 if is_multipath_negotiated {
5673 debug_assert!(
5674 space_id == SpaceId::Data || path_id == PathId::ZERO,
5675 "path acks must be sent in 1RTT space (have {space_id:?})"
5676 );
5677 }
5678
5679 let pns = space.for_path(path_id);
5680 let ranges = pns.pending_acks.ranges();
5681 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5682 let ecn = if receiving_ecn {
5683 Some(&pns.ecn_counters)
5684 } else {
5685 None
5686 };
5687 if let Some(max) = ranges.max() {
5688 sent.largest_acked.insert(path_id, max);
5689 }
5690
5691 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5692 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5694 let delay = delay_micros >> ack_delay_exp.into_inner();
5695
5696 if is_multipath_negotiated && space_id == SpaceId::Data {
5697 if !ranges.is_empty() {
5698 trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5699 frame::PathAck::encode(path_id, delay as _, ranges, ecn, buf);
5700 qlog.frame_path_ack(path_id, delay as _, ranges, ecn);
5701 stats.frame_tx.path_acks += 1;
5702 }
5703 } else {
5704 trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5705 frame::Ack::encode(delay as _, ranges, ecn, buf);
5706 stats.frame_tx.acks += 1;
5707 qlog.frame_ack(delay, ranges, ecn);
5708 }
5709 }
5710
5711 fn close_common(&mut self) {
5712 trace!("connection closed");
5713 self.timers.reset();
5714 }
5715
5716 fn set_close_timer(&mut self, now: Instant) {
5717 let pto_max = self.pto_max_path(self.highest_space, true);
5720 self.timers.set(
5721 Timer::Conn(ConnTimer::Close),
5722 now + 3 * pto_max,
5723 self.qlog.with_time(now),
5724 );
5725 }
5726
5727 fn handle_peer_params(
5732 &mut self,
5733 params: TransportParameters,
5734 loc_cid: ConnectionId,
5735 rem_cid: ConnectionId,
5736 now: Instant,
5737 ) -> Result<(), TransportError> {
5738 if Some(self.orig_rem_cid) != params.initial_src_cid
5739 || (self.side.is_client()
5740 && (Some(self.initial_dst_cid) != params.original_dst_cid
5741 || self.retry_src_cid != params.retry_src_cid))
5742 {
5743 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5744 "CID authentication failure",
5745 ));
5746 }
5747 if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5748 return Err(TransportError::PROTOCOL_VIOLATION(
5749 "multipath must not use zero-length CIDs",
5750 ));
5751 }
5752
5753 self.set_peer_params(params);
5754 self.qlog.emit_peer_transport_params_received(self, now);
5755
5756 Ok(())
5757 }
5758
5759 fn set_peer_params(&mut self, params: TransportParameters) {
5760 self.streams.set_params(¶ms);
5761 self.idle_timeout =
5762 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5763 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5764
5765 if let Some(ref info) = params.preferred_address {
5766 self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5768 path_id: None,
5769 sequence: 1,
5770 id: info.connection_id,
5771 reset_token: info.stateless_reset_token,
5772 retire_prior_to: 0,
5773 })
5774 .expect(
5775 "preferred address CID is the first received, and hence is guaranteed to be legal",
5776 );
5777 let remote = self.path_data(PathId::ZERO).remote;
5778 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5779 }
5780 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
5781
5782 let mut multipath_enabled = None;
5783 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5784 self.config.get_initial_max_path_id(),
5785 params.initial_max_path_id,
5786 ) {
5787 self.local_max_path_id = local_max_path_id;
5789 self.remote_max_path_id = remote_max_path_id;
5790 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5791 debug!(%initial_max_path_id, "multipath negotiated");
5792 multipath_enabled = Some(initial_max_path_id);
5793 }
5794
5795 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5796 self.config
5797 .max_remote_nat_traversal_addresses
5798 .zip(params.max_remote_nat_traversal_addresses)
5799 {
5800 if let Some(max_initial_paths) =
5801 multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5802 {
5803 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5804 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5805 self.iroh_hp =
5806 iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5807 debug!(
5808 %max_remote_addresses, %max_local_addresses,
5809 "iroh hole punching negotiated"
5810 );
5811
5812 match self.side() {
5813 Side::Client => {
5814 if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5815 warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5818 } else if max_local_addresses as u64
5819 > params.active_connection_id_limit.into_inner()
5820 {
5821 warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5825 }
5826 }
5827 Side::Server => {
5828 if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5829 warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5830 }
5831 }
5832 }
5833 } else {
5834 debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5835 }
5836 }
5837
5838 self.peer_params = params;
5839 let peer_max_udp_payload_size =
5840 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5841 self.path_data_mut(PathId::ZERO)
5842 .mtud
5843 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5844 }
5845
5846 fn decrypt_packet(
5848 &mut self,
5849 now: Instant,
5850 path_id: PathId,
5851 packet: &mut Packet,
5852 ) -> Result<Option<u64>, Option<TransportError>> {
5853 let result = packet_crypto::decrypt_packet_body(
5854 packet,
5855 path_id,
5856 &self.spaces,
5857 self.zero_rtt_crypto.as_ref(),
5858 self.key_phase,
5859 self.prev_crypto.as_ref(),
5860 self.next_crypto.as_ref(),
5861 )?;
5862
5863 let result = match result {
5864 Some(r) => r,
5865 None => return Ok(None),
5866 };
5867
5868 if result.outgoing_key_update_acked {
5869 if let Some(prev) = self.prev_crypto.as_mut() {
5870 prev.end_packet = Some((result.number, now));
5871 self.set_key_discard_timer(now, packet.header.space());
5872 }
5873 }
5874
5875 if result.incoming_key_update {
5876 trace!("key update authenticated");
5877 self.update_keys(Some((result.number, now)), true);
5878 self.set_key_discard_timer(now, packet.header.space());
5879 }
5880
5881 Ok(Some(result.number))
5882 }
5883
5884 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5885 trace!("executing key update");
5886 let new = self
5890 .crypto
5891 .next_1rtt_keys()
5892 .expect("only called for `Data` packets");
5893 self.key_phase_size = new
5894 .local
5895 .confidentiality_limit()
5896 .saturating_sub(KEY_UPDATE_MARGIN);
5897 let old = mem::replace(
5898 &mut self.spaces[SpaceId::Data]
5899 .crypto
5900 .as_mut()
5901 .unwrap() .packet,
5903 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5904 );
5905 self.spaces[SpaceId::Data]
5906 .iter_paths_mut()
5907 .for_each(|s| s.sent_with_keys = 0);
5908 self.prev_crypto = Some(PrevCrypto {
5909 crypto: old,
5910 end_packet,
5911 update_unacked: remote,
5912 });
5913 self.key_phase = !self.key_phase;
5914 }
5915
5916 fn peer_supports_ack_frequency(&self) -> bool {
5917 self.peer_params.min_ack_delay.is_some()
5918 }
5919
5920 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5925 debug_assert_eq!(
5926 self.highest_space,
5927 SpaceId::Data,
5928 "immediate ack must be written in the data space"
5929 );
5930 self.spaces[self.highest_space]
5931 .for_path(path_id)
5932 .immediate_ack_pending = true;
5933 }
5934
5935 #[cfg(test)]
5937 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5938 let (path_id, first_decode, remaining) = match &event.0 {
5939 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5940 path_id,
5941 first_decode,
5942 remaining,
5943 ..
5944 }) => (path_id, first_decode, remaining),
5945 _ => return None,
5946 };
5947
5948 if remaining.is_some() {
5949 panic!("Packets should never be coalesced in tests");
5950 }
5951
5952 let decrypted_header = packet_crypto::unprotect_header(
5953 first_decode.clone(),
5954 &self.spaces,
5955 self.zero_rtt_crypto.as_ref(),
5956 self.peer_params.stateless_reset_token,
5957 )?;
5958
5959 let mut packet = decrypted_header.packet?;
5960 packet_crypto::decrypt_packet_body(
5961 &mut packet,
5962 *path_id,
5963 &self.spaces,
5964 self.zero_rtt_crypto.as_ref(),
5965 self.key_phase,
5966 self.prev_crypto.as_ref(),
5967 self.next_crypto.as_ref(),
5968 )
5969 .ok()?;
5970
5971 Some(packet.payload.to_vec())
5972 }
5973
5974 #[cfg(test)]
5977 pub(crate) fn bytes_in_flight(&self) -> u64 {
5978 self.path_data(PathId::ZERO).in_flight.bytes
5980 }
5981
5982 #[cfg(test)]
5984 pub(crate) fn congestion_window(&self) -> u64 {
5985 let path = self.path_data(PathId::ZERO);
5986 path.congestion
5987 .window()
5988 .saturating_sub(path.in_flight.bytes)
5989 }
5990
5991 #[cfg(test)]
5993 pub(crate) fn is_idle(&self) -> bool {
5994 let current_timers = self.timers.values();
5995 current_timers
5996 .into_iter()
5997 .filter(|(timer, _)| {
5998 !matches!(
5999 timer,
6000 Timer::Conn(ConnTimer::KeepAlive)
6001 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6002 | Timer::Conn(ConnTimer::PushNewCid)
6003 | Timer::Conn(ConnTimer::KeyDiscard)
6004 )
6005 })
6006 .min_by_key(|(_, time)| *time)
6007 .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6008 }
6009
6010 #[cfg(test)]
6012 pub(crate) fn using_ecn(&self) -> bool {
6013 self.path_data(PathId::ZERO).sending_ecn
6014 }
6015
6016 #[cfg(test)]
6018 pub(crate) fn total_recvd(&self) -> u64 {
6019 self.path_data(PathId::ZERO).total_recvd
6020 }
6021
6022 #[cfg(test)]
6023 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6024 self.local_cid_state
6025 .get(&PathId::ZERO)
6026 .unwrap()
6027 .active_seq()
6028 }
6029
6030 #[cfg(test)]
6031 #[track_caller]
6032 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6033 self.local_cid_state
6034 .get(&PathId(path_id))
6035 .unwrap()
6036 .active_seq()
6037 }
6038
6039 #[cfg(test)]
6042 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6043 let n = self
6044 .local_cid_state
6045 .get_mut(&PathId::ZERO)
6046 .unwrap()
6047 .assign_retire_seq(v);
6048 self.endpoint_events
6049 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6050 }
6051
6052 #[cfg(test)]
6054 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
6055 self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
6056 }
6057
6058 #[cfg(test)]
6060 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6061 self.path_data(path_id).current_mtu()
6062 }
6063
6064 #[cfg(test)]
6066 pub(crate) fn trigger_path_validation(&mut self) {
6067 for path in self.paths.values_mut() {
6068 path.data.send_new_challenge = true;
6069 }
6070 }
6071
6072 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6083 let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
6084 path.data.send_new_challenge
6085 || path
6086 .prev
6087 .as_ref()
6088 .is_some_and(|(_, path)| path.send_new_challenge)
6089 || !path.data.path_responses.is_empty()
6090 });
6091 let other = self.streams.can_send_stream_data()
6092 || self
6093 .datagrams
6094 .outgoing
6095 .front()
6096 .is_some_and(|x| x.size(true) <= max_size);
6097 SendableFrames {
6098 acks: false,
6099 other,
6100 close: false,
6101 path_exclusive,
6102 }
6103 }
6104
6105 fn kill(&mut self, reason: ConnectionError) {
6107 self.close_common();
6108 self.state.move_to_drained(Some(reason));
6109 self.endpoint_events.push_back(EndpointEventInner::Drained);
6110 }
6111
6112 pub fn current_mtu(&self) -> u16 {
6119 self.paths
6120 .iter()
6121 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6122 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6123 .min()
6124 .expect("There is always at least one available path")
6125 }
6126
6127 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6134 let pn_len = PacketNumber::new(
6135 pn,
6136 self.spaces[SpaceId::Data]
6137 .for_path(path)
6138 .largest_acked_packet
6139 .unwrap_or(0),
6140 )
6141 .len();
6142
6143 1 + self
6145 .rem_cids
6146 .get(&path)
6147 .map(|cids| cids.active().len())
6148 .unwrap_or(20) + pn_len
6150 + self.tag_len_1rtt()
6151 }
6152
6153 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6154 let pn_len = 4;
6155
6156 let cid_len = self
6157 .rem_cids
6158 .values()
6159 .map(|cids| cids.active().len())
6160 .max()
6161 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
6165 }
6166
6167 fn tag_len_1rtt(&self) -> usize {
6168 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6169 Some(crypto) => Some(&*crypto.packet.local),
6170 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6171 };
6172 key.map_or(16, |x| x.tag_len())
6176 }
6177
6178 fn on_path_validated(&mut self, path_id: PathId) {
6180 self.path_data_mut(path_id).validated = true;
6181 let ConnectionSide::Server { server_config } = &self.side else {
6182 return;
6183 };
6184 let remote_addr = self.path_data(path_id).remote;
6185 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6186 new_tokens.clear();
6187 for _ in 0..server_config.validation_token.sent {
6188 new_tokens.push(remote_addr);
6189 }
6190 }
6191
6192 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6194 if let Some(path) = self.paths.get_mut(&path_id) {
6195 path.data.status.remote_update(status, status_seq_no);
6196 } else {
6197 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6198 }
6199 self.events.push_back(
6200 PathEvent::RemoteStatus {
6201 id: path_id,
6202 status,
6203 }
6204 .into(),
6205 );
6206 }
6207
6208 fn max_path_id(&self) -> Option<PathId> {
6217 if self.is_multipath_negotiated() {
6218 Some(self.remote_max_path_id.min(self.local_max_path_id))
6219 } else {
6220 None
6221 }
6222 }
6223
6224 pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6226 if let Some(added) = self.iroh_hp.add_local_address(address)? {
6227 self.spaces[SpaceId::Data].pending.add_address.insert(added);
6228 };
6229 Ok(())
6230 }
6231
6232 pub fn remove_nat_traversal_address(
6236 &mut self,
6237 address: SocketAddr,
6238 ) -> Result<(), iroh_hp::Error> {
6239 if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6240 self.spaces[SpaceId::Data]
6241 .pending
6242 .remove_address
6243 .insert(removed);
6244 }
6245 Ok(())
6246 }
6247
6248 pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6250 self.iroh_hp.get_local_nat_traversal_addresses()
6251 }
6252
6253 pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6255 Ok(self
6256 .iroh_hp
6257 .client_side()?
6258 .get_remote_nat_traversal_addresses())
6259 }
6260
6261 fn open_nat_traversal_path(
6269 &mut self,
6270 now: Instant,
6271 (ip, port): (IpAddr, u16),
6272 ipv6: bool,
6273 ) -> Result<Option<(PathId, SocketAddr, bool)>, PathError> {
6274 let remote = match ip {
6276 IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6277 IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6278 IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6279 IpAddr::V6(_) => {
6280 trace!("not using IPv6 nat candidate for IPv4 socket");
6281 return Ok(None);
6282 }
6283 };
6284 match self.open_path_ensure(remote, PathStatus::Backup, now) {
6285 Ok((path_id, path_was_known)) => {
6286 if path_was_known {
6287 trace!(%path_id, %remote, "nat traversal: path existed for remote");
6288 }
6289 Ok(Some((path_id, remote, path_was_known)))
6290 }
6291 Err(e) => {
6292 debug!(%remote, %e, "nat traversal: failed to probe remote");
6293 Err(e)
6294 }
6295 }
6296 }
6297
6298 pub fn initiate_nat_traversal_round(
6308 &mut self,
6309 now: Instant,
6310 ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6311 if self.state.is_closed() {
6312 return Err(iroh_hp::Error::Closed);
6313 }
6314
6315 let client_state = self.iroh_hp.client_side_mut()?;
6316 let iroh_hp::NatTraversalRound {
6317 new_round,
6318 reach_out_at,
6319 addresses_to_probe,
6320 prev_round_path_ids,
6321 } = client_state.initiate_nat_traversal_round()?;
6322
6323 self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6324
6325 for path_id in prev_round_path_ids {
6326 let validated = self
6329 .path(path_id)
6330 .map(|path| path.validated)
6331 .unwrap_or(false);
6332
6333 if !validated {
6334 let _ = self.close_path(
6335 now,
6336 path_id,
6337 TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6338 );
6339 }
6340 }
6341
6342 let mut err = None;
6343
6344 let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6345 let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6346 let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6347
6348 for (id, address) in addresses_to_probe {
6349 match self.open_nat_traversal_path(now, address, ipv6) {
6350 Ok(None) => {}
6351 Ok(Some((path_id, remote, path_was_known))) => {
6352 if !path_was_known {
6353 path_ids.push(path_id);
6354 probed_addresses.push(remote);
6355 }
6356 }
6357 Err(e) => {
6358 self.iroh_hp
6359 .client_side_mut()
6360 .expect("validated")
6361 .report_in_continuation(id, e);
6362 err.get_or_insert(e);
6363 }
6364 }
6365 }
6366
6367 if let Some(err) = err {
6368 if probed_addresses.is_empty() {
6370 return Err(iroh_hp::Error::Multipath(err));
6371 }
6372 }
6373
6374 self.iroh_hp
6375 .client_side_mut()
6376 .expect("connection side validated")
6377 .set_round_path_ids(path_ids);
6378
6379 Ok(probed_addresses)
6380 }
6381
6382 fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6387 let client_state = self.iroh_hp.client_side_mut().ok()?;
6388 let (id, address) = client_state.continue_nat_traversal_round()?;
6389 let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6390 let open_result = self.open_nat_traversal_path(now, address, ipv6);
6391 let client_state = self.iroh_hp.client_side_mut().expect("validated");
6392 match open_result {
6393 Ok(None) => Some(true),
6394 Ok(Some((path_id, _remote, path_was_known))) => {
6395 if !path_was_known {
6396 client_state.add_round_path_id(path_id);
6397 }
6398 Some(true)
6399 }
6400 Err(e) => {
6401 client_state.report_in_continuation(id, e);
6402 Some(false)
6403 }
6404 }
6405 }
6406}
6407
6408impl fmt::Debug for Connection {
6409 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6410 f.debug_struct("Connection")
6411 .field("handshake_cid", &self.handshake_cid)
6412 .finish()
6413 }
6414}
6415
6416#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6417enum PathBlocked {
6418 No,
6419 AntiAmplification,
6420 Congestion,
6421 Pacing,
6422}
6423
6424enum ConnectionSide {
6426 Client {
6427 token: Bytes,
6429 token_store: Arc<dyn TokenStore>,
6430 server_name: String,
6431 },
6432 Server {
6433 server_config: Arc<ServerConfig>,
6434 },
6435}
6436
6437impl ConnectionSide {
6438 fn remote_may_migrate(&self, state: &State) -> bool {
6439 match self {
6440 Self::Server { server_config } => server_config.migration,
6441 Self::Client { .. } => {
6442 if let Some(hs) = state.as_handshake() {
6443 hs.allow_server_migration
6444 } else {
6445 false
6446 }
6447 }
6448 }
6449 }
6450
6451 fn is_client(&self) -> bool {
6452 self.side().is_client()
6453 }
6454
6455 fn is_server(&self) -> bool {
6456 self.side().is_server()
6457 }
6458
6459 fn side(&self) -> Side {
6460 match *self {
6461 Self::Client { .. } => Side::Client,
6462 Self::Server { .. } => Side::Server,
6463 }
6464 }
6465}
6466
6467impl From<SideArgs> for ConnectionSide {
6468 fn from(side: SideArgs) -> Self {
6469 match side {
6470 SideArgs::Client {
6471 token_store,
6472 server_name,
6473 } => Self::Client {
6474 token: token_store.take(&server_name).unwrap_or_default(),
6475 token_store,
6476 server_name,
6477 },
6478 SideArgs::Server {
6479 server_config,
6480 pref_addr_cid: _,
6481 path_validated: _,
6482 } => Self::Server { server_config },
6483 }
6484 }
6485}
6486
6487pub(crate) enum SideArgs {
6489 Client {
6490 token_store: Arc<dyn TokenStore>,
6491 server_name: String,
6492 },
6493 Server {
6494 server_config: Arc<ServerConfig>,
6495 pref_addr_cid: Option<ConnectionId>,
6496 path_validated: bool,
6497 },
6498}
6499
6500impl SideArgs {
6501 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6502 match *self {
6503 Self::Client { .. } => None,
6504 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6505 }
6506 }
6507
6508 pub(crate) fn path_validated(&self) -> bool {
6509 match *self {
6510 Self::Client { .. } => true,
6511 Self::Server { path_validated, .. } => path_validated,
6512 }
6513 }
6514
6515 pub(crate) fn side(&self) -> Side {
6516 match *self {
6517 Self::Client { .. } => Side::Client,
6518 Self::Server { .. } => Side::Server,
6519 }
6520 }
6521}
6522
6523#[derive(Debug, Error, Clone, PartialEq, Eq)]
6525pub enum ConnectionError {
6526 #[error("peer doesn't implement any supported version")]
6528 VersionMismatch,
6529 #[error(transparent)]
6531 TransportError(#[from] TransportError),
6532 #[error("aborted by peer: {0}")]
6534 ConnectionClosed(frame::ConnectionClose),
6535 #[error("closed by peer: {0}")]
6537 ApplicationClosed(frame::ApplicationClose),
6538 #[error("reset by peer")]
6540 Reset,
6541 #[error("timed out")]
6547 TimedOut,
6548 #[error("closed")]
6550 LocallyClosed,
6551 #[error("CIDs exhausted")]
6555 CidsExhausted,
6556}
6557
6558impl From<Close> for ConnectionError {
6559 fn from(x: Close) -> Self {
6560 match x {
6561 Close::Connection(reason) => Self::ConnectionClosed(reason),
6562 Close::Application(reason) => Self::ApplicationClosed(reason),
6563 }
6564 }
6565}
6566
6567impl From<ConnectionError> for io::Error {
6569 fn from(x: ConnectionError) -> Self {
6570 use ConnectionError::*;
6571 let kind = match x {
6572 TimedOut => io::ErrorKind::TimedOut,
6573 Reset => io::ErrorKind::ConnectionReset,
6574 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6575 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6576 io::ErrorKind::Other
6577 }
6578 };
6579 Self::new(kind, x)
6580 }
6581}
6582
6583#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6586pub enum PathError {
6587 #[error("multipath extension not negotiated")]
6589 MultipathNotNegotiated,
6590 #[error("the server side may not open a path")]
6592 ServerSideNotAllowed,
6593 #[error("maximum number of concurrent paths reached")]
6595 MaxPathIdReached,
6596 #[error("remoted CIDs exhausted")]
6598 RemoteCidsExhausted,
6599 #[error("path validation failed")]
6601 ValidationFailed,
6602 #[error("invalid remote address")]
6604 InvalidRemoteAddress(SocketAddr),
6605}
6606
6607#[derive(Debug, Error, Clone, Eq, PartialEq)]
6609pub enum ClosePathError {
6610 #[error("closed path")]
6612 ClosedPath,
6613 #[error("last open path")]
6615 LastOpenPath,
6616}
6617
6618#[derive(Debug, Error, Clone, Copy)]
6619#[error("Multipath extension not negotiated")]
6620pub struct MultipathNotNegotiated {
6621 _private: (),
6622}
6623
6624#[derive(Debug)]
6626pub enum Event {
6627 HandshakeDataReady,
6629 Connected,
6631 HandshakeConfirmed,
6633 ConnectionLost {
6637 reason: ConnectionError,
6639 },
6640 Stream(StreamEvent),
6642 DatagramReceived,
6644 DatagramsUnblocked,
6646 Path(PathEvent),
6648 NatTraversal(iroh_hp::Event),
6650}
6651
6652impl From<PathEvent> for Event {
6653 fn from(source: PathEvent) -> Self {
6654 Self::Path(source)
6655 }
6656}
6657
6658fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6659 Duration::from_micros(params.max_ack_delay.0 * 1000)
6660}
6661
6662const MAX_BACKOFF_EXPONENT: u32 = 16;
6664
6665const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6673
6674const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6680 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6681
6682const KEY_UPDATE_MARGIN: u64 = 10_000;
6686
6687#[derive(Default)]
6688struct SentFrames {
6689 retransmits: ThinRetransmits,
6690 largest_acked: FxHashMap<PathId, u64>,
6692 stream_frames: StreamMetaVec,
6693 non_retransmits: bool,
6695 requires_padding: bool,
6697}
6698
6699impl SentFrames {
6700 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6702 !self.largest_acked.is_empty()
6703 && !self.non_retransmits
6704 && self.stream_frames.is_empty()
6705 && self.retransmits.is_empty(streams)
6706 }
6707}
6708
6709fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6717 match (x, y) {
6718 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6719 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6720 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6721 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6722 }
6723}
6724
6725#[cfg(test)]
6726mod tests {
6727 use super::*;
6728
6729 #[test]
6730 fn negotiate_max_idle_timeout_commutative() {
6731 let test_params = [
6732 (None, None, None),
6733 (None, Some(VarInt(0)), None),
6734 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6735 (Some(VarInt(0)), Some(VarInt(0)), None),
6736 (
6737 Some(VarInt(2)),
6738 Some(VarInt(0)),
6739 Some(Duration::from_millis(2)),
6740 ),
6741 (
6742 Some(VarInt(1)),
6743 Some(VarInt(4)),
6744 Some(Duration::from_millis(1)),
6745 ),
6746 ];
6747
6748 for (left, right, result) in test_params {
6749 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6750 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6751 }
6752 }
6753}