1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 num::{NonZeroU32, NonZeroUsize},
8 ops::Not,
9 sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
22 MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
23 TransportErrorCode, VarInt,
24 cid_generator::ConnectionIdGenerator,
25 cid_queue::CidQueue,
26 coding::{BufMutExt, Encodable},
27 config::{ServerConfig, TransportConfig},
28 congestion::Controller,
29 connection::{
30 qlog::{QlogRecvPacket, QlogSentPacket, QlogSink},
31 spaces::LostPacket,
32 timer::{ConnTimer, PathTimer},
33 },
34 crypto::{self, KeyPair, Keys, PacketKey},
35 frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
36 iroh_hp,
37 packet::{
38 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
39 PacketNumber, PartialDecode, SpaceId,
40 },
41 range_set::ArrayRangeSet,
42 shared::{
43 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
44 EndpointEvent, EndpointEventInner,
45 },
46 token::{ResetToken, Token, TokenPayload},
47 transport_parameters::TransportParameters,
48};
49
50mod ack_frequency;
51use ack_frequency::AckFrequencyState;
52
53mod assembler;
54pub use assembler::Chunk;
55
56mod cid_state;
57use cid_state::CidState;
58
59mod datagrams;
60use datagrams::DatagramState;
61pub use datagrams::{Datagrams, SendDatagramError};
62
63mod mtud;
64mod pacing;
65
66mod packet_builder;
67use packet_builder::{PacketBuilder, PadDatagram};
68
69mod packet_crypto;
70use packet_crypto::{PrevCrypto, ZeroRttCrypto};
71
72mod paths;
73pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
74use paths::{PathData, PathState};
75
76pub(crate) mod qlog;
77pub(crate) mod send_buffer;
78
79mod spaces;
80#[cfg(fuzzing)]
81pub use spaces::Retransmits;
82#[cfg(not(fuzzing))]
83use spaces::Retransmits;
84use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
85
86mod stats;
87pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
88
89mod streams;
90#[cfg(fuzzing)]
91pub use streams::StreamsState;
92#[cfg(not(fuzzing))]
93use streams::StreamsState;
94pub use streams::{
95 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
96 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
97};
98
99mod timer;
100use timer::{Timer, TimerTable};
101
102mod transmit_buf;
103use transmit_buf::TransmitBuf;
104
105mod state;
106
107#[cfg(not(fuzzing))]
108use state::State;
109#[cfg(fuzzing)]
110pub use state::State;
111use state::StateType;
112
113pub struct Connection {
153 endpoint_config: Arc<EndpointConfig>,
154 config: Arc<TransportConfig>,
155 rng: StdRng,
156 crypto: Box<dyn crypto::Session>,
157 handshake_cid: ConnectionId,
159 rem_handshake_cid: ConnectionId,
161 local_ip: Option<IpAddr>,
164 paths: BTreeMap<PathId, PathState>,
170 path_counter: u64,
174 allow_mtud: bool,
176 state: State,
177 side: ConnectionSide,
178 zero_rtt_enabled: bool,
180 zero_rtt_crypto: Option<ZeroRttCrypto>,
182 key_phase: bool,
183 key_phase_size: u64,
185 peer_params: TransportParameters,
187 orig_rem_cid: ConnectionId,
189 initial_dst_cid: ConnectionId,
191 retry_src_cid: Option<ConnectionId>,
194 events: VecDeque<Event>,
196 endpoint_events: VecDeque<EndpointEventInner>,
197 spin_enabled: bool,
199 spin: bool,
201 spaces: [PacketSpace; 3],
203 highest_space: SpaceId,
205 prev_crypto: Option<PrevCrypto>,
207 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
212 accepted_0rtt: bool,
213 permit_idle_reset: bool,
215 idle_timeout: Option<Duration>,
217 timers: TimerTable,
218 authentication_failures: u64,
220
221 close: bool,
226
227 ack_frequency: AckFrequencyState,
231
232 receiving_ecn: bool,
237 total_authed_packets: u64,
239 app_limited: bool,
242
243 next_observed_addr_seq_no: VarInt,
248
249 streams: StreamsState,
250 rem_cids: FxHashMap<PathId, CidQueue>,
256 local_cid_state: FxHashMap<PathId, CidState>,
263 datagrams: DatagramState,
265 stats: ConnectionStats,
267 path_stats: FxHashMap<PathId, PathStats>,
269 version: u32,
271
272 max_concurrent_paths: NonZeroU32,
281 local_max_path_id: PathId,
296 remote_max_path_id: PathId,
302 max_path_id_with_cids: PathId,
308 abandoned_paths: FxHashSet<PathId>,
316
317 iroh_hp: iroh_hp::State,
318 qlog: QlogSink,
319}
320
321impl Connection {
322 pub(crate) fn new(
323 endpoint_config: Arc<EndpointConfig>,
324 config: Arc<TransportConfig>,
325 init_cid: ConnectionId,
326 loc_cid: ConnectionId,
327 rem_cid: ConnectionId,
328 remote: SocketAddr,
329 local_ip: Option<IpAddr>,
330 crypto: Box<dyn crypto::Session>,
331 cid_gen: &dyn ConnectionIdGenerator,
332 now: Instant,
333 version: u32,
334 allow_mtud: bool,
335 rng_seed: [u8; 32],
336 side_args: SideArgs,
337 qlog: QlogSink,
338 ) -> Self {
339 let pref_addr_cid = side_args.pref_addr_cid();
340 let path_validated = side_args.path_validated();
341 let connection_side = ConnectionSide::from(side_args);
342 let side = connection_side.side();
343 let mut rng = StdRng::from_seed(rng_seed);
344 let initial_space = {
345 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
346 space.crypto = Some(crypto.initial_keys(init_cid, side));
347 space
348 };
349 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
350 #[cfg(test)]
351 let data_space = match config.deterministic_packet_numbers {
352 true => PacketSpace::new_deterministic(now, SpaceId::Data),
353 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
354 };
355 #[cfg(not(test))]
356 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
357 let state = State::handshake(state::Handshake {
358 rem_cid_set: side.is_server(),
359 expected_token: Bytes::new(),
360 client_hello: None,
361 allow_server_migration: side.is_client(),
362 });
363 let local_cid_state = FxHashMap::from_iter([(
364 PathId::ZERO,
365 CidState::new(
366 cid_gen.cid_len(),
367 cid_gen.cid_lifetime(),
368 now,
369 if pref_addr_cid.is_some() { 2 } else { 1 },
370 ),
371 )]);
372
373 let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config);
374 path.open = true;
376 let mut this = Self {
377 endpoint_config,
378 crypto,
379 handshake_cid: loc_cid,
380 rem_handshake_cid: rem_cid,
381 local_cid_state,
382 paths: BTreeMap::from_iter([(
383 PathId::ZERO,
384 PathState {
385 data: path,
386 prev: None,
387 },
388 )]),
389 path_counter: 0,
390 allow_mtud,
391 local_ip,
392 state,
393 side: connection_side,
394 zero_rtt_enabled: false,
395 zero_rtt_crypto: None,
396 key_phase: false,
397 key_phase_size: rng.random_range(10..1000),
404 peer_params: TransportParameters::default(),
405 orig_rem_cid: rem_cid,
406 initial_dst_cid: init_cid,
407 retry_src_cid: None,
408 events: VecDeque::new(),
409 endpoint_events: VecDeque::new(),
410 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
411 spin: false,
412 spaces: [initial_space, handshake_space, data_space],
413 highest_space: SpaceId::Initial,
414 prev_crypto: None,
415 next_crypto: None,
416 accepted_0rtt: false,
417 permit_idle_reset: true,
418 idle_timeout: match config.max_idle_timeout {
419 None | Some(VarInt(0)) => None,
420 Some(dur) => Some(Duration::from_millis(dur.0)),
421 },
422 timers: TimerTable::default(),
423 authentication_failures: 0,
424 close: false,
425
426 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
427 &TransportParameters::default(),
428 )),
429
430 app_limited: false,
431 receiving_ecn: false,
432 total_authed_packets: 0,
433
434 next_observed_addr_seq_no: 0u32.into(),
435
436 streams: StreamsState::new(
437 side,
438 config.max_concurrent_uni_streams,
439 config.max_concurrent_bidi_streams,
440 config.send_window,
441 config.receive_window,
442 config.stream_receive_window,
443 ),
444 datagrams: DatagramState::default(),
445 config,
446 rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
447 rng,
448 stats: ConnectionStats::default(),
449 path_stats: Default::default(),
450 version,
451
452 max_concurrent_paths: NonZeroU32::MIN,
454 local_max_path_id: PathId::ZERO,
455 remote_max_path_id: PathId::ZERO,
456 max_path_id_with_cids: PathId::ZERO,
457 abandoned_paths: Default::default(),
458
459 iroh_hp: Default::default(),
461 qlog,
462 };
463 if path_validated {
464 this.on_path_validated(PathId::ZERO);
465 }
466 if side.is_client() {
467 this.write_crypto();
469 this.init_0rtt(now);
470 }
471 this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now);
472 this
473 }
474
475 #[must_use]
483 pub fn poll_timeout(&mut self) -> Option<Instant> {
484 self.timers.peek()
485 }
486
487 #[must_use]
493 pub fn poll(&mut self) -> Option<Event> {
494 if let Some(x) = self.events.pop_front() {
495 return Some(x);
496 }
497
498 if let Some(event) = self.streams.poll() {
499 return Some(Event::Stream(event));
500 }
501
502 if let Some(reason) = self.state.take_error() {
503 return Some(Event::ConnectionLost { reason });
504 }
505
506 None
507 }
508
509 #[must_use]
511 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
512 self.endpoint_events.pop_front().map(EndpointEvent)
513 }
514
515 #[must_use]
517 pub fn streams(&mut self) -> Streams<'_> {
518 Streams {
519 state: &mut self.streams,
520 conn_state: &self.state,
521 }
522 }
523
524 #[must_use]
526 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
527 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
528 RecvStream {
529 id,
530 state: &mut self.streams,
531 pending: &mut self.spaces[SpaceId::Data].pending,
532 }
533 }
534
535 #[must_use]
537 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
538 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
539 SendStream {
540 id,
541 state: &mut self.streams,
542 pending: &mut self.spaces[SpaceId::Data].pending,
543 conn_state: &self.state,
544 }
545 }
546
547 pub fn open_path_ensure(
554 &mut self,
555 remote: SocketAddr,
556 initial_status: PathStatus,
557 now: Instant,
558 ) -> Result<(PathId, bool), PathError> {
559 match self
560 .paths
561 .iter()
562 .find(|(_id, path)| path.data.remote == remote)
563 {
564 Some((path_id, _state)) => Ok((*path_id, true)),
565 None => self
566 .open_path(remote, initial_status, now)
567 .map(|id| (id, false)),
568 }
569 }
570
571 pub fn open_path(
576 &mut self,
577 remote: SocketAddr,
578 initial_status: PathStatus,
579 now: Instant,
580 ) -> Result<PathId, PathError> {
581 if !self.is_multipath_negotiated() {
582 return Err(PathError::MultipathNotNegotiated);
583 }
584 if self.side().is_server() {
585 return Err(PathError::ServerSideNotAllowed);
586 }
587
588 let max_abandoned = self.abandoned_paths.iter().max().copied();
589 let max_used = self.paths.keys().last().copied();
590 let path_id = max_abandoned
591 .max(max_used)
592 .unwrap_or(PathId::ZERO)
593 .saturating_add(1u8);
594
595 if Some(path_id) > self.max_path_id() {
596 return Err(PathError::MaxPathIdReached);
597 }
598 if path_id > self.remote_max_path_id {
599 self.spaces[SpaceId::Data].pending.paths_blocked = true;
600 return Err(PathError::MaxPathIdReached);
601 }
602 if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
603 self.spaces[SpaceId::Data]
604 .pending
605 .path_cids_blocked
606 .push(path_id);
607 return Err(PathError::RemoteCidsExhausted);
608 }
609
610 let path = self.ensure_path(path_id, remote, now, None);
611 path.status.local_update(initial_status);
612
613 Ok(path_id)
614 }
615
616 pub fn close_path(
622 &mut self,
623 now: Instant,
624 path_id: PathId,
625 error_code: VarInt,
626 ) -> Result<(), ClosePathError> {
627 if self.abandoned_paths.contains(&path_id)
628 || Some(path_id) > self.max_path_id()
629 || !self.paths.contains_key(&path_id)
630 {
631 return Err(ClosePathError::ClosedPath);
632 }
633 if self
634 .paths
635 .iter()
636 .any(|(id, path)| {
638 *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
639 })
640 .not()
641 {
642 return Err(ClosePathError::LastOpenPath);
643 }
644
645 self.spaces[SpaceId::Data]
647 .pending
648 .path_abandon
649 .insert(path_id, error_code.into());
650
651 let pending_space = &mut self.spaces[SpaceId::Data].pending;
653 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
654 pending_space.path_cids_blocked.retain(|&id| id != path_id);
655 pending_space.path_status.retain(|&id| id != path_id);
656
657 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
659 for sent_packet in space.sent_packets.values_mut() {
660 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
661 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
662 retransmits.path_cids_blocked.retain(|&id| id != path_id);
663 retransmits.path_status.retain(|&id| id != path_id);
664 }
665 }
666 }
667
668 self.rem_cids.remove(&path_id);
674 self.endpoint_events
675 .push_back(EndpointEventInner::RetireResetToken(path_id));
676
677 let pto = self.pto_max_path(SpaceId::Data, false);
678
679 let path = self.paths.get_mut(&path_id).expect("checked above");
680
681 path.data.last_allowed_receive = Some(now + 3 * pto);
683 self.abandoned_paths.insert(path_id);
684
685 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
686
687 self.timers.set(
692 Timer::PerPath(path_id, PathTimer::DiscardPath),
693 now + 6 * pto,
694 self.qlog.with_time(now),
695 );
696 Ok(())
697 }
698
699 #[track_caller]
703 fn path_data(&self, path_id: PathId) -> &PathData {
704 if let Some(data) = self.paths.get(&path_id) {
705 &data.data
706 } else {
707 panic!(
708 "unknown path: {path_id}, currently known paths: {:?}",
709 self.paths.keys().collect::<Vec<_>>()
710 );
711 }
712 }
713
714 fn path(&self, path_id: PathId) -> Option<&PathData> {
716 self.paths.get(&path_id).map(|path_state| &path_state.data)
717 }
718
719 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
721 self.paths
722 .get_mut(&path_id)
723 .map(|path_state| &mut path_state.data)
724 }
725
726 pub fn paths(&self) -> Vec<PathId> {
730 self.paths.keys().copied().collect()
731 }
732
733 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
735 self.path(path_id)
736 .map(PathData::local_status)
737 .ok_or(ClosedPath { _private: () })
738 }
739
740 pub fn path_remote_address(&self, path_id: PathId) -> Result<SocketAddr, ClosedPath> {
742 self.path(path_id)
743 .map(|path| path.remote)
744 .ok_or(ClosedPath { _private: () })
745 }
746
747 pub fn set_path_status(
751 &mut self,
752 path_id: PathId,
753 status: PathStatus,
754 ) -> Result<PathStatus, SetPathStatusError> {
755 if !self.is_multipath_negotiated() {
756 return Err(SetPathStatusError::MultipathNotNegotiated);
757 }
758 let path = self
759 .path_mut(path_id)
760 .ok_or(SetPathStatusError::ClosedPath)?;
761 let prev = match path.status.local_update(status) {
762 Some(prev) => {
763 self.spaces[SpaceId::Data]
764 .pending
765 .path_status
766 .insert(path_id);
767 prev
768 }
769 None => path.local_status(),
770 };
771 Ok(prev)
772 }
773
774 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
779 self.path(path_id).and_then(|path| path.remote_status())
780 }
781
782 pub fn set_path_max_idle_timeout(
788 &mut self,
789 path_id: PathId,
790 timeout: Option<Duration>,
791 ) -> Result<Option<Duration>, ClosedPath> {
792 let path = self
793 .paths
794 .get_mut(&path_id)
795 .ok_or(ClosedPath { _private: () })?;
796 Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
797 }
798
799 pub fn set_path_keep_alive_interval(
805 &mut self,
806 path_id: PathId,
807 interval: Option<Duration>,
808 ) -> Result<Option<Duration>, ClosedPath> {
809 let path = self
810 .paths
811 .get_mut(&path_id)
812 .ok_or(ClosedPath { _private: () })?;
813 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
814 }
815
816 #[track_caller]
820 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
821 &mut self.paths.get_mut(&path_id).expect("known path").data
822 }
823
824 fn ensure_path(
825 &mut self,
826 path_id: PathId,
827 remote: SocketAddr,
828 now: Instant,
829 pn: Option<u64>,
830 ) -> &mut PathData {
831 let vacant_entry = match self.paths.entry(path_id) {
832 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
833 btree_map::Entry::Occupied(occupied_entry) => {
834 return &mut occupied_entry.into_mut().data;
835 }
836 };
837
838 debug!(%path_id, ?remote, "path added");
841 let peer_max_udp_payload_size =
842 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
843 self.path_counter = self.path_counter.wrapping_add(1);
844 let mut data = PathData::new(
845 remote,
846 self.allow_mtud,
847 Some(peer_max_udp_payload_size),
848 self.path_counter,
849 now,
850 &self.config,
851 );
852
853 let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
854 self.timers.set(
855 Timer::PerPath(path_id, PathTimer::PathOpen),
856 now + 3 * pto,
857 self.qlog.with_time(now),
858 );
859
860 data.send_new_challenge = true;
863
864 let path = vacant_entry.insert(PathState { data, prev: None });
865
866 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
867 if let Some(pn) = pn {
868 pn_space.dedup.insert(pn);
869 }
870 self.spaces[SpaceId::Data]
871 .number_spaces
872 .insert(path_id, pn_space);
873 self.qlog.emit_tuple_assigned(path_id, remote, now);
874 &mut path.data
875 }
876
877 #[must_use]
887 pub fn poll_transmit(
888 &mut self,
889 now: Instant,
890 max_datagrams: NonZeroUsize,
891 buf: &mut Vec<u8>,
892 ) -> Option<Transmit> {
893 if let Some(probing) = self
894 .iroh_hp
895 .server_side_mut()
896 .ok()
897 .and_then(iroh_hp::ServerState::next_probe)
898 {
899 let destination = probing.remote();
900 trace!(%destination, "RAND_DATA packet");
901 let token: u64 = self.rng.random();
902 buf.put_u64(token);
903 probing.finish(token);
904 return Some(Transmit {
905 destination,
906 ecn: None,
907 size: 8,
908 segment_size: None,
909 src_ip: None,
910 });
911 }
912
913 let max_datagrams = match self.config.enable_segmentation_offload {
914 false => NonZeroUsize::MIN,
915 true => max_datagrams,
916 };
917
918 let close = match self.state.as_type() {
937 StateType::Drained => {
938 self.app_limited = true;
939 return None;
940 }
941 StateType::Draining | StateType::Closed => {
942 if !self.close {
945 self.app_limited = true;
946 return None;
947 }
948 true
949 }
950 _ => false,
951 };
952
953 if let Some(config) = &self.config.ack_frequency_config {
955 let rtt = self
956 .paths
957 .values()
958 .map(|p| p.data.rtt.get())
959 .min()
960 .expect("one path exists");
961 self.spaces[SpaceId::Data].pending.ack_frequency = self
962 .ack_frequency
963 .should_send_ack_frequency(rtt, config, &self.peer_params)
964 && self.highest_space == SpaceId::Data
965 && self.peer_supports_ack_frequency();
966 }
967
968 let mut coalesce = true;
970
971 let mut pad_datagram = PadDatagram::No;
974
975 let mut congestion_blocked = false;
979
980 let mut last_packet_number = None;
982
983 let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
984
985 let have_available_path = self.paths.iter().any(|(id, path)| {
988 path.data.validated
989 && path.data.local_status() == PathStatus::Available
990 && self.rem_cids.contains_key(id)
991 });
992
993 let mut transmit = TransmitBuf::new(
995 buf,
996 max_datagrams,
997 self.path_data(path_id).current_mtu().into(),
998 );
999 if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
1000 return Some(challenge);
1001 }
1002 let mut space_id = match path_id {
1003 PathId::ZERO => SpaceId::Initial,
1004 _ => SpaceId::Data,
1005 };
1006
1007 loop {
1008 let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1010 let err = PathError::RemoteCidsExhausted;
1011 if !self.abandoned_paths.contains(&path_id) {
1012 debug!(?err, %path_id, "no active CID for path");
1013 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1014 id: path_id,
1015 error: err,
1016 }));
1017 self.close_path(
1021 now,
1022 path_id,
1023 TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1024 )
1025 .ok();
1026 self.spaces[SpaceId::Data]
1027 .pending
1028 .path_cids_blocked
1029 .push(path_id);
1030 } else {
1031 trace!(%path_id, "remote CIDs retired for abandoned path");
1032 }
1033
1034 match self.paths.keys().find(|&&next| next > path_id) {
1035 Some(next_path_id) => {
1036 path_id = *next_path_id;
1038 space_id = SpaceId::Data;
1039
1040 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1042 if let Some(challenge) =
1043 self.send_prev_path_challenge(now, &mut transmit, path_id)
1044 {
1045 return Some(challenge);
1046 }
1047
1048 continue;
1049 }
1050 None => {
1051 trace!(
1053 ?space_id,
1054 %path_id,
1055 "no CIDs to send on path, no more paths"
1056 );
1057 break;
1058 }
1059 }
1060 };
1061
1062 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1065 transmit.datagram_remaining_mut()
1067 } else {
1068 transmit.segment_size()
1070 };
1071 let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1072 let path_should_send = {
1073 let path_exclusive_only = space_id == SpaceId::Data
1074 && have_available_path
1075 && self.path_data(path_id).local_status() == PathStatus::Backup;
1076 let path_should_send = if path_exclusive_only {
1077 can_send.path_exclusive
1078 } else {
1079 !can_send.is_empty()
1080 };
1081 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1082 path_should_send || needs_loss_probe
1083 };
1084
1085 if !path_should_send && space_id < SpaceId::Data {
1086 if self.spaces[space_id].crypto.is_some() {
1087 trace!(?space_id, %path_id, "nothing to send in space");
1088 }
1089 space_id = space_id.next();
1090 continue;
1091 }
1092
1093 let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1094 self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1096 } else {
1097 PathBlocked::No
1098 };
1099 if send_blocked != PathBlocked::No {
1100 trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1101 congestion_blocked = true;
1102 }
1103 if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1104 space_id = space_id.next();
1107 continue;
1108 }
1109 if !path_should_send || send_blocked != PathBlocked::No {
1110 if transmit.num_datagrams() > 0 {
1115 break;
1116 }
1117
1118 match self.paths.keys().find(|&&next| next > path_id) {
1119 Some(next_path_id) => {
1120 trace!(
1122 ?space_id,
1123 %path_id,
1124 %next_path_id,
1125 "nothing to send on path"
1126 );
1127 path_id = *next_path_id;
1128 space_id = SpaceId::Data;
1129
1130 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1132 if let Some(challenge) =
1133 self.send_prev_path_challenge(now, &mut transmit, path_id)
1134 {
1135 return Some(challenge);
1136 }
1137
1138 continue;
1139 }
1140 None => {
1141 trace!(
1143 ?space_id,
1144 %path_id,
1145 next_path_id=?None::<PathId>,
1146 "nothing to send on path"
1147 );
1148 break;
1149 }
1150 }
1151 }
1152
1153 if transmit.datagram_remaining_mut() == 0 {
1155 if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1156 break;
1158 }
1159
1160 match self.spaces[space_id].for_path(path_id).loss_probes {
1161 0 => transmit.start_new_datagram(),
1162 _ => {
1163 let request_immediate_ack =
1165 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1166 self.spaces[space_id].maybe_queue_probe(
1167 path_id,
1168 request_immediate_ack,
1169 &self.streams,
1170 );
1171
1172 self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1173
1174 transmit.start_new_datagram_with_size(std::cmp::min(
1178 usize::from(INITIAL_MTU),
1179 transmit.segment_size(),
1180 ));
1181 }
1182 }
1183 trace!(count = transmit.num_datagrams(), "new datagram started");
1184 coalesce = true;
1185 pad_datagram = PadDatagram::No;
1186 }
1187
1188 if transmit.datagram_start_offset() < transmit.len() {
1191 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1192 }
1193
1194 if self.spaces[SpaceId::Initial].crypto.is_some()
1199 && space_id == SpaceId::Handshake
1200 && self.side.is_client()
1201 {
1202 self.discard_space(now, SpaceId::Initial);
1205 }
1206 if let Some(ref mut prev) = self.prev_crypto {
1207 prev.update_unacked = false;
1208 }
1209
1210 let mut qlog = QlogSentPacket::default();
1211 let mut builder = PacketBuilder::new(
1212 now,
1213 space_id,
1214 path_id,
1215 remote_cid,
1216 &mut transmit,
1217 can_send.other,
1218 self,
1219 &mut qlog,
1220 )?;
1221 last_packet_number = Some(builder.exact_number);
1222 coalesce = coalesce && !builder.short_header;
1223
1224 if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1225 pad_datagram |= PadDatagram::ToMinMtu;
1227 }
1228 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1229 pad_datagram |= PadDatagram::ToSegmentSize;
1230 }
1231
1232 if can_send.close {
1233 trace!("sending CONNECTION_CLOSE");
1234 let mut sent_frames = SentFrames::default();
1239 let is_multipath_negotiated = self.is_multipath_negotiated();
1240 for path_id in self.spaces[space_id]
1241 .number_spaces
1242 .iter()
1243 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1244 .map(|(&path_id, _)| path_id)
1245 .collect::<Vec<_>>()
1246 {
1247 Self::populate_acks(
1248 now,
1249 self.receiving_ecn,
1250 &mut sent_frames,
1251 path_id,
1252 space_id,
1253 &mut self.spaces[space_id],
1254 is_multipath_negotiated,
1255 &mut builder.frame_space_mut(),
1256 &mut self.stats,
1257 &mut qlog,
1258 );
1259 }
1260
1261 debug_assert!(
1265 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1266 "ACKs should leave space for ConnectionClose"
1267 );
1268 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1269 let max_frame_size = builder.frame_space_remaining();
1270 match self.state.as_type() {
1271 StateType::Closed => {
1272 let reason: Close =
1273 self.state.as_closed().expect("checked").clone().into();
1274 if space_id == SpaceId::Data || reason.is_transport_layer() {
1275 reason.encode(&mut builder.frame_space_mut(), max_frame_size);
1276 qlog.frame(&Frame::Close(reason));
1277 } else {
1278 let frame = frame::ConnectionClose {
1279 error_code: TransportErrorCode::APPLICATION_ERROR,
1280 frame_type: frame::MaybeFrame::None,
1281 reason: Bytes::new(),
1282 };
1283 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1284 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1285 }
1286 }
1287 StateType::Draining => {
1288 let frame = frame::ConnectionClose {
1289 error_code: TransportErrorCode::NO_ERROR,
1290 frame_type: frame::MaybeFrame::None,
1291 reason: Bytes::new(),
1292 };
1293 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1294 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1295 }
1296 _ => unreachable!(
1297 "tried to make a close packet when the connection wasn't closed"
1298 ),
1299 };
1300 }
1301 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1302 if space_id == self.highest_space {
1303 self.close = false;
1306 break;
1308 } else {
1309 space_id = space_id.next();
1313 continue;
1314 }
1315 }
1316
1317 if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1320 let path = self.path_data_mut(path_id);
1321 if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) {
1322 let response = frame::PathResponse(token);
1326 trace!(%response, "(off-path)");
1327 builder.frame_space_mut().write(response);
1328 qlog.frame(&Frame::PathResponse(response));
1329 self.stats.frame_tx.path_response += 1;
1330 builder.finish_and_track(
1331 now,
1332 self,
1333 path_id,
1334 SentFrames {
1335 non_retransmits: true,
1336 ..SentFrames::default()
1337 },
1338 PadDatagram::ToMinMtu,
1339 qlog,
1340 );
1341 self.stats.udp_tx.on_sent(1, transmit.len());
1342 return Some(Transmit {
1343 destination: remote,
1344 size: transmit.len(),
1345 ecn: None,
1346 segment_size: None,
1347 src_ip: self.local_ip,
1348 });
1349 }
1350 }
1351
1352 let sent_frames = {
1353 let path_exclusive_only = have_available_path
1354 && self.path_data(path_id).local_status() == PathStatus::Backup;
1355 let pn = builder.exact_number;
1356 self.populate_packet(
1357 now,
1358 space_id,
1359 path_id,
1360 path_exclusive_only,
1361 &mut builder.frame_space_mut(),
1362 pn,
1363 &mut qlog,
1364 )
1365 };
1366
1367 debug_assert!(
1374 !(sent_frames.is_ack_only(&self.streams)
1375 && !can_send.acks
1376 && can_send.other
1377 && builder.buf.segment_size()
1378 == self.path_data(path_id).current_mtu() as usize
1379 && self.datagrams.outgoing.is_empty()),
1380 "SendableFrames was {can_send:?}, but only ACKs have been written"
1381 );
1382 if sent_frames.requires_padding {
1383 pad_datagram |= PadDatagram::ToMinMtu;
1384 }
1385
1386 for (path_id, _pn) in sent_frames.largest_acked.iter() {
1387 self.spaces[space_id]
1388 .for_path(*path_id)
1389 .pending_acks
1390 .acks_sent();
1391 self.timers.stop(
1392 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1393 self.qlog.with_time(now),
1394 );
1395 }
1396
1397 if coalesce
1405 && builder
1406 .buf
1407 .datagram_remaining_mut()
1408 .saturating_sub(builder.predict_packet_end())
1409 > MIN_PACKET_SPACE
1410 && self
1411 .next_send_space(space_id, path_id, builder.buf, close)
1412 .is_some()
1413 {
1414 builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog);
1417 } else {
1418 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1421 const MAX_PADDING: usize = 32;
1429 if builder.buf.datagram_remaining_mut()
1430 > builder.predict_packet_end() + MAX_PADDING
1431 {
1432 trace!(
1433 "GSO truncated by demand for {} padding bytes",
1434 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1435 );
1436 builder.finish_and_track(
1437 now,
1438 self,
1439 path_id,
1440 sent_frames,
1441 PadDatagram::No,
1442 qlog,
1443 );
1444 break;
1445 }
1446
1447 builder.finish_and_track(
1450 now,
1451 self,
1452 path_id,
1453 sent_frames,
1454 PadDatagram::ToSegmentSize,
1455 qlog,
1456 );
1457 } else {
1458 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1459 }
1460 if transmit.num_datagrams() == 1 {
1461 transmit.clip_datagram_size();
1462 }
1463 }
1464 }
1465
1466 if let Some(last_packet_number) = last_packet_number {
1467 self.path_data_mut(path_id).congestion.on_sent(
1470 now,
1471 transmit.len() as u64,
1472 last_packet_number,
1473 );
1474 }
1475
1476 self.qlog.emit_recovery_metrics(
1477 path_id,
1478 &mut self.paths.get_mut(&path_id).unwrap().data,
1479 now,
1480 );
1481
1482 self.app_limited = transmit.is_empty() && !congestion_blocked;
1483
1484 if transmit.is_empty() && self.state.is_established() {
1486 let space_id = SpaceId::Data;
1488 path_id = *self.paths.first_key_value().expect("one path must exist").0;
1489 let probe_data = loop {
1490 let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1496 let eligible = self.path_data(path_id).validated
1497 && !self.path_data(path_id).is_validating_path()
1498 && !self.abandoned_paths.contains(&path_id);
1499 let probe_size = eligible
1500 .then(|| {
1501 let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1502 self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1503 })
1504 .flatten();
1505 match (active_cid, probe_size) {
1506 (Some(active_cid), Some(probe_size)) => {
1507 break Some((active_cid, probe_size));
1509 }
1510 _ => {
1511 match self.paths.keys().find(|&&next| next > path_id) {
1513 Some(next) => {
1514 path_id = *next;
1515 continue;
1516 }
1517 None => break None,
1518 }
1519 }
1520 }
1521 };
1522 if let Some((active_cid, probe_size)) = probe_data {
1523 debug_assert_eq!(transmit.num_datagrams(), 0);
1525 transmit.start_new_datagram_with_size(probe_size as usize);
1526
1527 let mut qlog = QlogSentPacket::default();
1528 let mut builder = PacketBuilder::new(
1529 now,
1530 space_id,
1531 path_id,
1532 active_cid,
1533 &mut transmit,
1534 true,
1535 self,
1536 &mut qlog,
1537 )?;
1538
1539 trace!(?probe_size, "writing MTUD probe");
1541 trace!("PING");
1542 builder.frame_space_mut().write(frame::FrameType::Ping);
1543 qlog.frame(&Frame::Ping);
1544 self.stats.frame_tx.ping += 1;
1545
1546 if self.peer_supports_ack_frequency() {
1548 trace!("IMMEDIATE_ACK");
1549 builder
1550 .frame_space_mut()
1551 .write(frame::FrameType::ImmediateAck);
1552 self.stats.frame_tx.immediate_ack += 1;
1553 qlog.frame(&Frame::ImmediateAck);
1554 }
1555
1556 let sent_frames = SentFrames {
1557 non_retransmits: true,
1558 ..Default::default()
1559 };
1560 builder.finish_and_track(
1561 now,
1562 self,
1563 path_id,
1564 sent_frames,
1565 PadDatagram::ToSize(probe_size),
1566 qlog,
1567 );
1568
1569 self.path_stats
1570 .entry(path_id)
1571 .or_default()
1572 .sent_plpmtud_probes += 1;
1573 }
1574 }
1575
1576 if transmit.is_empty() {
1577 return None;
1578 }
1579
1580 let destination = self.path_data(path_id).remote;
1581 trace!(
1582 segment_size = transmit.segment_size(),
1583 last_datagram_len = transmit.len() % transmit.segment_size(),
1584 ?destination,
1585 "sending {} bytes in {} datagrams",
1586 transmit.len(),
1587 transmit.num_datagrams()
1588 );
1589 self.path_data_mut(path_id)
1590 .inc_total_sent(transmit.len() as u64);
1591
1592 self.stats
1593 .udp_tx
1594 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1595
1596 Some(Transmit {
1597 destination,
1598 size: transmit.len(),
1599 ecn: if self.path_data(path_id).sending_ecn {
1600 Some(EcnCodepoint::Ect0)
1601 } else {
1602 None
1603 },
1604 segment_size: match transmit.num_datagrams() {
1605 1 => None,
1606 _ => Some(transmit.segment_size()),
1607 },
1608 src_ip: self.local_ip,
1609 })
1610 }
1611
1612 fn next_send_space(
1617 &mut self,
1618 current_space_id: SpaceId,
1619 path_id: PathId,
1620 buf: &TransmitBuf<'_>,
1621 close: bool,
1622 ) -> Option<SpaceId> {
1623 let mut space_id = current_space_id;
1630 loop {
1631 let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1632 if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1633 return Some(space_id);
1634 }
1635 space_id = match space_id {
1636 SpaceId::Initial => SpaceId::Handshake,
1637 SpaceId::Handshake => SpaceId::Data,
1638 SpaceId::Data => break,
1639 }
1640 }
1641 None
1642 }
1643
1644 fn path_congestion_check(
1646 &mut self,
1647 space_id: SpaceId,
1648 path_id: PathId,
1649 transmit: &TransmitBuf<'_>,
1650 can_send: &SendableFrames,
1651 now: Instant,
1652 ) -> PathBlocked {
1653 if self.side().is_server()
1659 && self
1660 .path_data(path_id)
1661 .anti_amplification_blocked(transmit.len() as u64 + 1)
1662 {
1663 trace!(?space_id, %path_id, "blocked by anti-amplification");
1664 return PathBlocked::AntiAmplification;
1665 }
1666
1667 let bytes_to_send = transmit.segment_size() as u64;
1670 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1671
1672 if can_send.other && !need_loss_probe && !can_send.close {
1673 let path = self.path_data(path_id);
1674 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1675 trace!(?space_id, %path_id, "blocked by congestion control");
1676 return PathBlocked::Congestion;
1677 }
1678 }
1679
1680 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1682 self.timers.set(
1683 Timer::PerPath(path_id, PathTimer::Pacing),
1684 delay,
1685 self.qlog.with_time(now),
1686 );
1687 trace!(?space_id, %path_id, "blocked by pacing");
1690 return PathBlocked::Pacing;
1691 }
1692
1693 PathBlocked::No
1694 }
1695
1696 fn send_prev_path_challenge(
1701 &mut self,
1702 now: Instant,
1703 buf: &mut TransmitBuf<'_>,
1704 path_id: PathId,
1705 ) -> Option<Transmit> {
1706 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1707 if !prev_path.send_new_challenge {
1710 return None;
1711 };
1712 prev_path.send_new_challenge = false;
1713 let destination = prev_path.remote;
1714 let token = self.rng.random();
1715 let info = paths::SentChallengeInfo {
1716 sent_instant: now,
1717 remote: destination,
1718 };
1719 prev_path.challenges_sent.insert(token, info);
1720 debug_assert_eq!(
1721 self.highest_space,
1722 SpaceId::Data,
1723 "PATH_CHALLENGE queued without 1-RTT keys"
1724 );
1725 buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1726
1727 debug_assert_eq!(buf.datagram_start_offset(), 0);
1733 let mut qlog = QlogSentPacket::default();
1734 let mut builder = PacketBuilder::new(
1735 now,
1736 SpaceId::Data,
1737 path_id,
1738 *prev_cid,
1739 buf,
1740 false,
1741 self,
1742 &mut qlog,
1743 )?;
1744 let challenge = frame::PathChallenge(token);
1745 trace!(%challenge, "validating previous path");
1746 qlog.frame(&Frame::PathChallenge(challenge));
1747 builder.frame_space_mut().write(challenge);
1748 self.stats.frame_tx.path_challenge += 1;
1749
1750 builder.pad_to(MIN_INITIAL_SIZE);
1755
1756 builder.finish(self, now, qlog);
1757 self.stats.udp_tx.on_sent(1, buf.len());
1758
1759 Some(Transmit {
1760 destination,
1761 size: buf.len(),
1762 ecn: None,
1763 segment_size: None,
1764 src_ip: self.local_ip,
1765 })
1766 }
1767
1768 fn space_can_send(
1773 &mut self,
1774 space_id: SpaceId,
1775 path_id: PathId,
1776 packet_size: usize,
1777 close: bool,
1778 ) -> SendableFrames {
1779 let pn = self.spaces[SpaceId::Data]
1780 .for_path(path_id)
1781 .peek_tx_number();
1782 let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1783 if self.spaces[space_id].crypto.is_none()
1784 && (space_id != SpaceId::Data
1785 || self.zero_rtt_crypto.is_none()
1786 || self.side.is_server())
1787 {
1788 return SendableFrames::empty();
1790 }
1791 let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1792 if space_id == SpaceId::Data {
1793 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1794 }
1795
1796 can_send.close = close && self.spaces[space_id].crypto.is_some();
1797
1798 can_send
1799 }
1800
1801 pub fn handle_event(&mut self, event: ConnectionEvent) {
1807 use ConnectionEventInner::*;
1808 match event.0 {
1809 Datagram(DatagramConnectionEvent {
1810 now,
1811 remote,
1812 path_id,
1813 ecn,
1814 first_decode,
1815 remaining,
1816 }) => {
1817 let span = trace_span!("pkt", %path_id);
1818 let _guard = span.enter();
1819 if let Some(known_remote) = self.path(path_id).map(|path| path.remote) {
1823 if remote != known_remote && !self.side.remote_may_migrate(&self.state) {
1824 trace!(
1825 %path_id,
1826 ?remote,
1827 path_remote = ?self.path(path_id).map(|p| p.remote),
1828 "discarding packet from unrecognized peer"
1829 );
1830 return;
1831 }
1832 }
1833
1834 let was_anti_amplification_blocked = self
1835 .path(path_id)
1836 .map(|path| path.anti_amplification_blocked(1))
1837 .unwrap_or(true); self.stats.udp_rx.datagrams += 1;
1841 self.stats.udp_rx.bytes += first_decode.len() as u64;
1842 let data_len = first_decode.len();
1843
1844 self.handle_decode(now, remote, path_id, ecn, first_decode);
1845 if let Some(path) = self.path_mut(path_id) {
1850 path.inc_total_recvd(data_len as u64);
1851 }
1852
1853 if let Some(data) = remaining {
1854 self.stats.udp_rx.bytes += data.len() as u64;
1855 self.handle_coalesced(now, remote, path_id, ecn, data);
1856 }
1857
1858 if let Some(path) = self.paths.get_mut(&path_id) {
1859 self.qlog
1860 .emit_recovery_metrics(path_id, &mut path.data, now);
1861 }
1862
1863 if was_anti_amplification_blocked {
1864 self.set_loss_detection_timer(now, path_id);
1868 }
1869 }
1870 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1871 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1872 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1873 let cid_state = self
1874 .local_cid_state
1875 .entry(path_id)
1876 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1877 cid_state.new_cids(&ids, now);
1878
1879 ids.into_iter().rev().for_each(|frame| {
1880 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1881 });
1882 self.reset_cid_retirement(now);
1884 }
1885 }
1886 }
1887
1888 pub fn handle_timeout(&mut self, now: Instant) {
1898 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1899 trace!(?timer, at=?now, "timeout");
1901 match timer {
1902 Timer::Conn(timer) => match timer {
1903 ConnTimer::Close => {
1904 self.state.move_to_drained(None);
1905 self.endpoint_events.push_back(EndpointEventInner::Drained);
1906 }
1907 ConnTimer::Idle => {
1908 self.kill(ConnectionError::TimedOut);
1909 }
1910 ConnTimer::KeepAlive => {
1911 trace!("sending keep-alive");
1912 self.ping();
1913 }
1914 ConnTimer::KeyDiscard => {
1915 self.zero_rtt_crypto = None;
1916 self.prev_crypto = None;
1917 }
1918 ConnTimer::PushNewCid => {
1919 while let Some((path_id, when)) = self.next_cid_retirement() {
1920 if when > now {
1921 break;
1922 }
1923 match self.local_cid_state.get_mut(&path_id) {
1924 None => error!(%path_id, "No local CID state for path"),
1925 Some(cid_state) => {
1926 let num_new_cid = cid_state.on_cid_timeout().into();
1928 if !self.state.is_closed() {
1929 trace!(
1930 "push a new CID to peer RETIRE_PRIOR_TO field {}",
1931 cid_state.retire_prior_to()
1932 );
1933 self.endpoint_events.push_back(
1934 EndpointEventInner::NeedIdentifiers(
1935 path_id,
1936 now,
1937 num_new_cid,
1938 ),
1939 );
1940 }
1941 }
1942 }
1943 }
1944 }
1945 },
1946 Timer::PerPath(path_id, timer) => {
1948 let span = trace_span!("per-path timer fired", %path_id, ?timer);
1949 let _guard = span.enter();
1950 match timer {
1951 PathTimer::PathIdle => {
1952 self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1953 .ok();
1954 }
1955
1956 PathTimer::PathKeepAlive => {
1957 trace!("sending keep-alive on path");
1958 self.ping_path(path_id).ok();
1959 }
1960 PathTimer::LossDetection => {
1961 self.on_loss_detection_timeout(now, path_id);
1962 self.qlog.emit_recovery_metrics(
1963 path_id,
1964 &mut self.paths.get_mut(&path_id).unwrap().data,
1965 now,
1966 );
1967 }
1968 PathTimer::PathValidation => {
1969 let Some(path) = self.paths.get_mut(&path_id) else {
1970 continue;
1971 };
1972 self.timers.stop(
1973 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1974 self.qlog.with_time(now),
1975 );
1976 debug!("path validation failed");
1977 if let Some((_, prev)) = path.prev.take() {
1978 path.data = prev;
1979 }
1980 path.data.challenges_sent.clear();
1981 path.data.send_new_challenge = false;
1982 }
1983 PathTimer::PathChallengeLost => {
1984 let Some(path) = self.paths.get_mut(&path_id) else {
1985 continue;
1986 };
1987 trace!("path challenge deemed lost");
1988 path.data.send_new_challenge = true;
1989 }
1990 PathTimer::PathOpen => {
1991 let Some(path) = self.path_mut(path_id) else {
1992 continue;
1993 };
1994 path.challenges_sent.clear();
1995 path.send_new_challenge = false;
1996 debug!("new path validation failed");
1997 if let Err(err) = self.close_path(
1998 now,
1999 path_id,
2000 TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2001 ) {
2002 warn!(?err, "failed closing path");
2003 }
2004
2005 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2006 id: path_id,
2007 error: PathError::ValidationFailed,
2008 }));
2009 }
2010 PathTimer::Pacing => trace!("pacing timer expired"),
2011 PathTimer::MaxAckDelay => {
2012 trace!("max ack delay reached");
2013 self.spaces[SpaceId::Data]
2015 .for_path(path_id)
2016 .pending_acks
2017 .on_max_ack_delay_timeout()
2018 }
2019 PathTimer::DiscardPath => {
2020 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2023 if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2024 let (min_seq, max_seq) = loc_cid_state.active_seq();
2025 for seq in min_seq..=max_seq {
2026 self.endpoint_events.push_back(
2027 EndpointEventInner::RetireConnectionId(
2028 now, path_id, seq, false,
2029 ),
2030 );
2031 }
2032 }
2033 self.discard_path(path_id, now);
2034 }
2035 }
2036 }
2037 }
2038 }
2039 }
2040
2041 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2053 self.close_inner(
2054 now,
2055 Close::Application(frame::ApplicationClose { error_code, reason }),
2056 )
2057 }
2058
2059 fn close_inner(&mut self, now: Instant, reason: Close) {
2060 let was_closed = self.state.is_closed();
2061 if !was_closed {
2062 self.close_common();
2063 self.set_close_timer(now);
2064 self.close = true;
2065 self.state.move_to_closed_local(reason);
2066 }
2067 }
2068
2069 pub fn datagrams(&mut self) -> Datagrams<'_> {
2071 Datagrams { conn: self }
2072 }
2073
2074 pub fn stats(&mut self) -> ConnectionStats {
2076 self.stats.clone()
2077 }
2078
2079 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2081 let path = self.paths.get(&path_id)?;
2082 let stats = self.path_stats.entry(path_id).or_default();
2083 stats.rtt = path.data.rtt.get();
2084 stats.cwnd = path.data.congestion.window();
2085 stats.current_mtu = path.data.mtud.current_mtu();
2086 Some(*stats)
2087 }
2088
2089 pub fn ping(&mut self) {
2093 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2096 path_data.ping_pending = true;
2097 }
2098 }
2099
2100 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2104 let path_data = self.spaces[self.highest_space]
2105 .number_spaces
2106 .get_mut(&path)
2107 .ok_or(ClosedPath { _private: () })?;
2108 path_data.ping_pending = true;
2109 Ok(())
2110 }
2111
2112 pub fn force_key_update(&mut self) {
2116 if !self.state.is_established() {
2117 debug!("ignoring forced key update in illegal state");
2118 return;
2119 }
2120 if self.prev_crypto.is_some() {
2121 debug!("ignoring redundant forced key update");
2124 return;
2125 }
2126 self.update_keys(None, false);
2127 }
2128
2129 #[doc(hidden)]
2131 #[deprecated]
2132 pub fn initiate_key_update(&mut self) {
2133 self.force_key_update();
2134 }
2135
2136 pub fn crypto_session(&self) -> &dyn crypto::Session {
2138 &*self.crypto
2139 }
2140
2141 pub fn is_handshaking(&self) -> bool {
2146 self.state.is_handshake()
2147 }
2148
2149 pub fn is_closed(&self) -> bool {
2157 self.state.is_closed()
2158 }
2159
2160 pub fn is_drained(&self) -> bool {
2165 self.state.is_drained()
2166 }
2167
2168 pub fn accepted_0rtt(&self) -> bool {
2172 self.accepted_0rtt
2173 }
2174
2175 pub fn has_0rtt(&self) -> bool {
2177 self.zero_rtt_enabled
2178 }
2179
2180 pub fn has_pending_retransmits(&self) -> bool {
2182 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2183 }
2184
2185 pub fn side(&self) -> Side {
2187 self.side.side()
2188 }
2189
2190 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2192 self.path(path_id)
2193 .map(|path_data| {
2194 path_data
2195 .last_observed_addr_report
2196 .as_ref()
2197 .map(|observed| observed.socket_addr())
2198 })
2199 .ok_or(ClosedPath { _private: () })
2200 }
2201
2202 pub fn local_ip(&self) -> Option<IpAddr> {
2212 self.local_ip
2213 }
2214
2215 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2217 self.path(path_id).map(|d| d.rtt.get())
2218 }
2219
2220 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2222 self.path(path_id).map(|d| d.congestion.as_ref())
2223 }
2224
2225 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2230 self.streams.set_max_concurrent(dir, count);
2231 let pending = &mut self.spaces[SpaceId::Data].pending;
2234 self.streams.queue_max_stream_id(pending);
2235 }
2236
2237 pub fn set_max_concurrent_paths(
2247 &mut self,
2248 now: Instant,
2249 count: NonZeroU32,
2250 ) -> Result<(), MultipathNotNegotiated> {
2251 if !self.is_multipath_negotiated() {
2252 return Err(MultipathNotNegotiated { _private: () });
2253 }
2254 self.max_concurrent_paths = count;
2255
2256 let in_use_count = self
2257 .local_max_path_id
2258 .next()
2259 .saturating_sub(self.abandoned_paths.len() as u32)
2260 .as_u32();
2261 let extra_needed = count.get().saturating_sub(in_use_count);
2262 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2263
2264 self.set_max_path_id(now, new_max_path_id);
2265
2266 Ok(())
2267 }
2268
2269 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2271 if max_path_id <= self.local_max_path_id {
2272 return;
2273 }
2274
2275 self.local_max_path_id = max_path_id;
2276 self.spaces[SpaceId::Data].pending.max_path_id = true;
2277
2278 self.issue_first_path_cids(now);
2279 }
2280
2281 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2287 self.streams.max_concurrent(dir)
2288 }
2289
2290 pub fn set_send_window(&mut self, send_window: u64) {
2292 self.streams.set_send_window(send_window);
2293 }
2294
2295 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2297 if self.streams.set_receive_window(receive_window) {
2298 self.spaces[SpaceId::Data].pending.max_data = true;
2299 }
2300 }
2301
2302 pub fn is_multipath_negotiated(&self) -> bool {
2307 !self.is_handshaking()
2308 && self.config.max_concurrent_multipath_paths.is_some()
2309 && self.peer_params.initial_max_path_id.is_some()
2310 }
2311
2312 fn on_ack_received(
2313 &mut self,
2314 now: Instant,
2315 space: SpaceId,
2316 ack: frame::Ack,
2317 ) -> Result<(), TransportError> {
2318 let path = PathId::ZERO;
2320 self.inner_on_ack_received(now, space, path, ack)
2321 }
2322
2323 fn on_path_ack_received(
2324 &mut self,
2325 now: Instant,
2326 space: SpaceId,
2327 path_ack: frame::PathAck,
2328 ) -> Result<(), TransportError> {
2329 let (ack, path) = path_ack.into_ack();
2330 self.inner_on_ack_received(now, space, path, ack)
2331 }
2332
2333 fn inner_on_ack_received(
2335 &mut self,
2336 now: Instant,
2337 space: SpaceId,
2338 path: PathId,
2339 ack: frame::Ack,
2340 ) -> Result<(), TransportError> {
2341 if self.abandoned_paths.contains(&path) {
2342 trace!("silently ignoring PATH_ACK on abandoned path");
2345 return Ok(());
2346 }
2347 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2348 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2349 }
2350 let new_largest = {
2351 let space = &mut self.spaces[space].for_path(path);
2352 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2353 space.largest_acked_packet = Some(ack.largest);
2354 if let Some(info) = space.sent_packets.get(ack.largest) {
2355 space.largest_acked_packet_sent = info.time_sent;
2359 }
2360 true
2361 } else {
2362 false
2363 }
2364 };
2365
2366 if self.detect_spurious_loss(&ack, space, path) {
2367 self.path_data_mut(path)
2368 .congestion
2369 .on_spurious_congestion_event();
2370 }
2371
2372 let mut newly_acked = ArrayRangeSet::new();
2374 for range in ack.iter() {
2375 self.spaces[space].for_path(path).check_ack(range.clone())?;
2376 for (pn, _) in self.spaces[space]
2377 .for_path(path)
2378 .sent_packets
2379 .iter_range(range)
2380 {
2381 newly_acked.insert_one(pn);
2382 }
2383 }
2384
2385 if newly_acked.is_empty() {
2386 return Ok(());
2387 }
2388
2389 let mut ack_eliciting_acked = false;
2390 for packet in newly_acked.elts() {
2391 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2392 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2393 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2399 pns.pending_acks.subtract_below(*acked_pn);
2400 }
2401 }
2402 ack_eliciting_acked |= info.ack_eliciting;
2403
2404 let path_data = self.path_data_mut(path);
2406 let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2407 if mtu_updated {
2408 path_data
2409 .congestion
2410 .on_mtu_update(path_data.mtud.current_mtu());
2411 }
2412
2413 self.ack_frequency.on_acked(path, packet);
2415
2416 self.on_packet_acked(now, path, info);
2417 }
2418 }
2419
2420 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2421 let app_limited = self.app_limited;
2422 let path_data = self.path_data_mut(path);
2423 let in_flight = path_data.in_flight.bytes;
2424
2425 path_data
2426 .congestion
2427 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2428
2429 if new_largest && ack_eliciting_acked {
2430 let ack_delay = if space != SpaceId::Data {
2431 Duration::from_micros(0)
2432 } else {
2433 cmp::min(
2434 self.ack_frequency.peer_max_ack_delay,
2435 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2436 )
2437 };
2438 let rtt = now.saturating_duration_since(
2439 self.spaces[space].for_path(path).largest_acked_packet_sent,
2440 );
2441
2442 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2443 let path_data = self.path_data_mut(path);
2444 path_data.rtt.update(ack_delay, rtt);
2446 if path_data.first_packet_after_rtt_sample.is_none() {
2447 path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2448 }
2449 }
2450
2451 self.detect_lost_packets(now, space, path, true);
2453
2454 if self.peer_completed_address_validation(path) {
2455 self.path_data_mut(path).pto_count = 0;
2456 }
2457
2458 if self.path_data(path).sending_ecn {
2463 if let Some(ecn) = ack.ecn {
2464 if new_largest {
2469 let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2470 self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2471 }
2472 } else {
2473 debug!("ECN not acknowledged by peer");
2475 self.path_data_mut(path).sending_ecn = false;
2476 }
2477 }
2478
2479 self.set_loss_detection_timer(now, path);
2480 Ok(())
2481 }
2482
2483 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2484 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2485
2486 if lost_packets.is_empty() {
2487 return false;
2488 }
2489
2490 for range in ack.iter() {
2491 let spurious_losses: Vec<u64> = lost_packets
2492 .iter_range(range.clone())
2493 .map(|(pn, _info)| pn)
2494 .collect();
2495
2496 for pn in spurious_losses {
2497 lost_packets.remove(pn);
2498 }
2499 }
2500
2501 lost_packets.is_empty()
2506 }
2507
2508 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2513 let two_pto = 2 * self.path_data(path).rtt.pto_base();
2514
2515 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2516 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2517 }
2518
2519 fn process_ecn(
2521 &mut self,
2522 now: Instant,
2523 space: SpaceId,
2524 path: PathId,
2525 newly_acked: u64,
2526 ecn: frame::EcnCounts,
2527 largest_sent_time: Instant,
2528 ) {
2529 match self.spaces[space]
2530 .for_path(path)
2531 .detect_ecn(newly_acked, ecn)
2532 {
2533 Err(e) => {
2534 debug!("halting ECN due to verification failure: {}", e);
2535
2536 self.path_data_mut(path).sending_ecn = false;
2537 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2540 }
2541 Ok(false) => {}
2542 Ok(true) => {
2543 self.path_stats.entry(path).or_default().congestion_events += 1;
2544 self.path_data_mut(path).congestion.on_congestion_event(
2545 now,
2546 largest_sent_time,
2547 false,
2548 true,
2549 0,
2550 );
2551 }
2552 }
2553 }
2554
2555 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2558 self.paths
2559 .get_mut(&path_id)
2560 .expect("known path")
2561 .remove_in_flight(&info);
2562 let app_limited = self.app_limited;
2563 let path = self.path_data_mut(path_id);
2564 if info.ack_eliciting && !path.is_validating_path() {
2565 let rtt = path.rtt;
2568 path.congestion
2569 .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2570 }
2571
2572 if let Some(retransmits) = info.retransmits.get() {
2574 for (id, _) in retransmits.reset_stream.iter() {
2575 self.streams.reset_acked(*id);
2576 }
2577 }
2578
2579 for frame in info.stream_frames {
2580 self.streams.received_ack_of(frame);
2581 }
2582 }
2583
2584 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2585 let start = if self.zero_rtt_crypto.is_some() {
2586 now
2587 } else {
2588 self.prev_crypto
2589 .as_ref()
2590 .expect("no previous keys")
2591 .end_packet
2592 .as_ref()
2593 .expect("update not acknowledged yet")
2594 .1
2595 };
2596
2597 self.timers.set(
2599 Timer::Conn(ConnTimer::KeyDiscard),
2600 start + self.pto_max_path(space, false) * 3,
2601 self.qlog.with_time(now),
2602 );
2603 }
2604
2605 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2618 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2619 self.detect_lost_packets(now, pn_space, path_id, false);
2621 self.set_loss_detection_timer(now, path_id);
2622 return;
2623 }
2624
2625 let (_, space) = match self.pto_time_and_space(now, path_id) {
2626 Some(x) => x,
2627 None => {
2628 error!(%path_id, "PTO expired while unset");
2629 return;
2630 }
2631 };
2632 trace!(
2633 in_flight = self.path_data(path_id).in_flight.bytes,
2634 count = self.path_data(path_id).pto_count,
2635 ?space,
2636 %path_id,
2637 "PTO fired"
2638 );
2639
2640 let count = match self.path_data(path_id).in_flight.ack_eliciting {
2641 0 => {
2644 debug_assert!(!self.peer_completed_address_validation(path_id));
2645 1
2646 }
2647 _ => 2,
2649 };
2650 let pns = self.spaces[space].for_path(path_id);
2651 pns.loss_probes = pns.loss_probes.saturating_add(count);
2652 let path_data = self.path_data_mut(path_id);
2653 path_data.pto_count = path_data.pto_count.saturating_add(1);
2654 self.set_loss_detection_timer(now, path_id);
2655 }
2656
2657 fn detect_lost_packets(
2674 &mut self,
2675 now: Instant,
2676 pn_space: SpaceId,
2677 path_id: PathId,
2678 due_to_ack: bool,
2679 ) {
2680 let mut lost_packets = Vec::<u64>::new();
2681 let mut lost_mtu_probe = None;
2682 let mut in_persistent_congestion = false;
2683 let mut size_of_lost_packets = 0u64;
2684 self.spaces[pn_space].for_path(path_id).loss_time = None;
2685
2686 let path = self.path_data(path_id);
2689 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2690 let loss_delay = path
2691 .rtt
2692 .conservative()
2693 .mul_f32(self.config.time_threshold)
2694 .max(TIMER_GRANULARITY);
2695 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2696
2697 let largest_acked_packet = self.spaces[pn_space]
2698 .for_path(path_id)
2699 .largest_acked_packet
2700 .expect("detect_lost_packets only to be called if path received at least one ACK");
2701 let packet_threshold = self.config.packet_threshold as u64;
2702
2703 let congestion_period = self
2707 .pto(SpaceId::Data, path_id)
2708 .saturating_mul(self.config.persistent_congestion_threshold);
2709 let mut persistent_congestion_start: Option<Instant> = None;
2710 let mut prev_packet = None;
2711 let space = self.spaces[pn_space].for_path(path_id);
2712
2713 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2714 if prev_packet != Some(packet.wrapping_sub(1)) {
2715 persistent_congestion_start = None;
2717 }
2718
2719 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2723 if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2724 if Some(packet) == in_flight_mtu_probe {
2726 lost_mtu_probe = in_flight_mtu_probe;
2729 } else {
2730 lost_packets.push(packet);
2731 size_of_lost_packets += info.size as u64;
2732 if info.ack_eliciting && due_to_ack {
2733 match persistent_congestion_start {
2734 Some(start) if info.time_sent - start > congestion_period => {
2737 in_persistent_congestion = true;
2738 }
2739 None if first_packet_after_rtt_sample
2741 .is_some_and(|x| x < (pn_space, packet)) =>
2742 {
2743 persistent_congestion_start = Some(info.time_sent);
2744 }
2745 _ => {}
2746 }
2747 }
2748 }
2749 } else {
2750 if space.loss_time.is_none() {
2752 space.loss_time = Some(info.time_sent + loss_delay);
2755 }
2756 persistent_congestion_start = None;
2757 }
2758
2759 prev_packet = Some(packet);
2760 }
2761
2762 self.handle_lost_packets(
2763 pn_space,
2764 path_id,
2765 now,
2766 lost_packets,
2767 lost_mtu_probe,
2768 loss_delay,
2769 in_persistent_congestion,
2770 size_of_lost_packets,
2771 );
2772 }
2773
2774 fn discard_path(&mut self, path_id: PathId, now: Instant) {
2776 trace!(%path_id, "dropping path state");
2777 let path = self.path_data(path_id);
2778 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2779
2780 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2782 .for_path(path_id)
2783 .sent_packets
2784 .iter()
2785 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2786 .map(|(pn, info)| {
2787 size_of_lost_packets += info.size as u64;
2788 pn
2789 })
2790 .collect();
2791
2792 if !lost_pns.is_empty() {
2793 trace!(
2794 %path_id,
2795 count = lost_pns.len(),
2796 lost_bytes = size_of_lost_packets,
2797 "packets lost on path abandon"
2798 );
2799 self.handle_lost_packets(
2800 SpaceId::Data,
2801 path_id,
2802 now,
2803 lost_pns,
2804 in_flight_mtu_probe,
2805 Duration::ZERO,
2806 false,
2807 size_of_lost_packets,
2808 );
2809 }
2810 self.paths.remove(&path_id);
2811 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2812
2813 let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2814 self.events.push_back(
2815 PathEvent::Abandoned {
2816 id: path_id,
2817 path_stats,
2818 }
2819 .into(),
2820 );
2821 }
2822
2823 fn handle_lost_packets(
2824 &mut self,
2825 pn_space: SpaceId,
2826 path_id: PathId,
2827 now: Instant,
2828 lost_packets: Vec<u64>,
2829 lost_mtu_probe: Option<u64>,
2830 loss_delay: Duration,
2831 in_persistent_congestion: bool,
2832 size_of_lost_packets: u64,
2833 ) {
2834 debug_assert!(
2835 {
2836 let mut sorted = lost_packets.clone();
2837 sorted.sort();
2838 sorted == lost_packets
2839 },
2840 "lost_packets must be sorted"
2841 );
2842
2843 self.drain_lost_packets(now, pn_space, path_id);
2844
2845 if let Some(largest_lost) = lost_packets.last().cloned() {
2847 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2848 let largest_lost_sent = self.spaces[pn_space]
2849 .for_path(path_id)
2850 .sent_packets
2851 .get(largest_lost)
2852 .unwrap()
2853 .time_sent;
2854 let path_stats = self.path_stats.entry(path_id).or_default();
2855 path_stats.lost_packets += lost_packets.len() as u64;
2856 path_stats.lost_bytes += size_of_lost_packets;
2857 trace!(
2858 %path_id,
2859 count = lost_packets.len(),
2860 lost_bytes = size_of_lost_packets,
2861 "packets lost",
2862 );
2863
2864 for &packet in &lost_packets {
2865 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2866 continue;
2867 };
2868 self.qlog
2869 .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2870 self.paths
2871 .get_mut(&path_id)
2872 .unwrap()
2873 .remove_in_flight(&info);
2874
2875 for frame in info.stream_frames {
2876 self.streams.retransmit(frame);
2877 }
2878 self.spaces[pn_space].pending |= info.retransmits;
2879 self.path_data_mut(path_id)
2880 .mtud
2881 .on_non_probe_lost(packet, info.size);
2882
2883 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2884 packet,
2885 LostPacket {
2886 time_sent: info.time_sent,
2887 },
2888 );
2889 }
2890
2891 let path = self.path_data_mut(path_id);
2892 if path.mtud.black_hole_detected(now) {
2893 path.congestion.on_mtu_update(path.mtud.current_mtu());
2894 if let Some(max_datagram_size) = self.datagrams().max_size() {
2895 self.datagrams.drop_oversized(max_datagram_size);
2896 }
2897 self.path_stats
2898 .entry(path_id)
2899 .or_default()
2900 .black_holes_detected += 1;
2901 }
2902
2903 let lost_ack_eliciting =
2905 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2906
2907 if lost_ack_eliciting {
2908 self.path_stats
2909 .entry(path_id)
2910 .or_default()
2911 .congestion_events += 1;
2912 self.path_data_mut(path_id).congestion.on_congestion_event(
2913 now,
2914 largest_lost_sent,
2915 in_persistent_congestion,
2916 false,
2917 size_of_lost_packets,
2918 );
2919 }
2920 }
2921
2922 if let Some(packet) = lost_mtu_probe {
2924 let info = self.spaces[SpaceId::Data]
2925 .for_path(path_id)
2926 .take(packet)
2927 .unwrap(); self.paths
2930 .get_mut(&path_id)
2931 .unwrap()
2932 .remove_in_flight(&info);
2933 self.path_data_mut(path_id).mtud.on_probe_lost();
2934 self.path_stats
2935 .entry(path_id)
2936 .or_default()
2937 .lost_plpmtud_probes += 1;
2938 }
2939 }
2940
2941 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2947 SpaceId::iter()
2948 .filter_map(|id| {
2949 self.spaces[id]
2950 .number_spaces
2951 .get(&path_id)
2952 .and_then(|pns| pns.loss_time)
2953 .map(|time| (time, id))
2954 })
2955 .min_by_key(|&(time, _)| time)
2956 }
2957
2958 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2960 let path = self.path(path_id)?;
2961 let pto_count = path.pto_count;
2962 let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2963 let mut duration = path.rtt.pto_base() * backoff;
2964
2965 if path_id == PathId::ZERO
2966 && path.in_flight.ack_eliciting == 0
2967 && !self.peer_completed_address_validation(PathId::ZERO)
2968 {
2969 let space = match self.highest_space {
2975 SpaceId::Handshake => SpaceId::Handshake,
2976 _ => SpaceId::Initial,
2977 };
2978
2979 return Some((now + duration, space));
2980 }
2981
2982 let mut result = None;
2983 for space in SpaceId::iter() {
2984 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2985 continue;
2986 };
2987
2988 if !pns.has_in_flight() {
2989 continue;
2990 }
2991 if space == SpaceId::Data {
2992 if self.is_handshaking() {
2994 return result;
2995 }
2996 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2998 }
2999 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3000 continue;
3001 };
3002 let pto = last_ack_eliciting + duration;
3003 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3004 if path.anti_amplification_blocked(1) {
3005 continue;
3007 }
3008 if path.in_flight.ack_eliciting == 0 {
3009 continue;
3011 }
3012 result = Some((pto, space));
3013 }
3014 }
3015 result
3016 }
3017
3018 fn peer_completed_address_validation(&self, path: PathId) -> bool {
3019 if self.side.is_server() || self.state.is_closed() {
3021 return true;
3022 }
3023 self.spaces[SpaceId::Handshake]
3026 .path_space(PathId::ZERO)
3027 .and_then(|pns| pns.largest_acked_packet)
3028 .is_some()
3029 || self.spaces[SpaceId::Data]
3030 .path_space(path)
3031 .and_then(|pns| pns.largest_acked_packet)
3032 .is_some()
3033 || (self.spaces[SpaceId::Data].crypto.is_some()
3034 && self.spaces[SpaceId::Handshake].crypto.is_none())
3035 }
3036
3037 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3045 if self.state.is_closed() {
3046 return;
3050 }
3051
3052 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3053 self.timers.set(
3055 Timer::PerPath(path_id, PathTimer::LossDetection),
3056 loss_time,
3057 self.qlog.with_time(now),
3058 );
3059 return;
3060 }
3061
3062 if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3065 self.timers.set(
3066 Timer::PerPath(path_id, PathTimer::LossDetection),
3067 timeout,
3068 self.qlog.with_time(now),
3069 );
3070 } else {
3071 self.timers.stop(
3072 Timer::PerPath(path_id, PathTimer::LossDetection),
3073 self.qlog.with_time(now),
3074 );
3075 }
3076 }
3077
3078 fn pto_max_path(&self, space: SpaceId, is_closing: bool) -> Duration {
3084 match space {
3085 SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3086 SpaceId::Data => self
3087 .paths
3088 .iter()
3089 .filter_map(|(path_id, state)| {
3090 if is_closing && state.data.total_sent == 0 && state.data.total_recvd == 0 {
3091 None
3093 } else {
3094 let pto = self.pto(space, *path_id);
3095 Some(pto)
3096 }
3097 })
3098 .max()
3099 .expect("there should be one at least path"),
3100 }
3101 }
3102
3103 fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3108 let max_ack_delay = match space {
3109 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3110 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3111 };
3112 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3113 }
3114
3115 fn on_packet_authenticated(
3116 &mut self,
3117 now: Instant,
3118 space_id: SpaceId,
3119 path_id: PathId,
3120 ecn: Option<EcnCodepoint>,
3121 packet: Option<u64>,
3122 spin: bool,
3123 is_1rtt: bool,
3124 ) {
3125 self.total_authed_packets += 1;
3126 if let Some(last_allowed_receive) = self
3127 .paths
3128 .get(&path_id)
3129 .and_then(|path| path.data.last_allowed_receive)
3130 {
3131 if now > last_allowed_receive {
3132 warn!("received data on path which we abandoned more than 3 * PTO ago");
3133 if !self.state.is_closed() {
3135 self.state.move_to_closed(TransportError::NO_ERROR(
3137 "peer failed to respond with PATH_ABANDON in time",
3138 ));
3139 self.close_common();
3140 self.set_close_timer(now);
3141 self.close = true;
3142 }
3143 return;
3144 }
3145 }
3146
3147 self.reset_keep_alive(path_id, now);
3148 self.reset_idle_timeout(now, space_id, path_id);
3149 self.permit_idle_reset = true;
3150 self.receiving_ecn |= ecn.is_some();
3151 if let Some(x) = ecn {
3152 let space = &mut self.spaces[space_id];
3153 space.for_path(path_id).ecn_counters += x;
3154
3155 if x.is_ce() {
3156 space
3157 .for_path(path_id)
3158 .pending_acks
3159 .set_immediate_ack_required();
3160 }
3161 }
3162
3163 let packet = match packet {
3164 Some(x) => x,
3165 None => return,
3166 };
3167 match &self.side {
3168 ConnectionSide::Client { .. } => {
3169 if space_id == SpaceId::Handshake {
3173 if let Some(hs) = self.state.as_handshake_mut() {
3174 hs.allow_server_migration = false;
3175 }
3176 }
3177 }
3178 ConnectionSide::Server { .. } => {
3179 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3180 {
3181 self.discard_space(now, SpaceId::Initial);
3183 }
3184 if self.zero_rtt_crypto.is_some() && is_1rtt {
3185 self.set_key_discard_timer(now, space_id)
3187 }
3188 }
3189 }
3190 let space = self.spaces[space_id].for_path(path_id);
3191 space.pending_acks.insert_one(packet, now);
3192 if packet >= space.rx_packet.unwrap_or_default() {
3193 space.rx_packet = Some(packet);
3194 self.spin = self.side.is_client() ^ spin;
3196 }
3197 }
3198
3199 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3204 if let Some(timeout) = self.idle_timeout {
3206 if self.state.is_closed() {
3207 self.timers
3208 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3209 } else {
3210 let dt = cmp::max(timeout, 3 * self.pto_max_path(space, false));
3211 self.timers.set(
3212 Timer::Conn(ConnTimer::Idle),
3213 now + dt,
3214 self.qlog.with_time(now),
3215 );
3216 }
3217 }
3218
3219 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3221 if self.state.is_closed() {
3222 self.timers.stop(
3223 Timer::PerPath(path_id, PathTimer::PathIdle),
3224 self.qlog.with_time(now),
3225 );
3226 } else {
3227 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3228 self.timers.set(
3229 Timer::PerPath(path_id, PathTimer::PathIdle),
3230 now + dt,
3231 self.qlog.with_time(now),
3232 );
3233 }
3234 }
3235 }
3236
3237 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3239 if !self.state.is_established() {
3240 return;
3241 }
3242
3243 if let Some(interval) = self.config.keep_alive_interval {
3244 self.timers.set(
3245 Timer::Conn(ConnTimer::KeepAlive),
3246 now + interval,
3247 self.qlog.with_time(now),
3248 );
3249 }
3250
3251 if let Some(interval) = self.path_data(path_id).keep_alive {
3252 self.timers.set(
3253 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3254 now + interval,
3255 self.qlog.with_time(now),
3256 );
3257 }
3258 }
3259
3260 fn reset_cid_retirement(&mut self, now: Instant) {
3262 if let Some((_path, t)) = self.next_cid_retirement() {
3263 self.timers.set(
3264 Timer::Conn(ConnTimer::PushNewCid),
3265 t,
3266 self.qlog.with_time(now),
3267 );
3268 }
3269 }
3270
3271 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3273 self.local_cid_state
3274 .iter()
3275 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3276 .min_by_key(|(_path_id, timeout)| *timeout)
3277 }
3278
3279 pub(crate) fn handle_first_packet(
3284 &mut self,
3285 now: Instant,
3286 remote: SocketAddr,
3287 ecn: Option<EcnCodepoint>,
3288 packet_number: u64,
3289 packet: InitialPacket,
3290 remaining: Option<BytesMut>,
3291 ) -> Result<(), ConnectionError> {
3292 let span = trace_span!("first recv");
3293 let _guard = span.enter();
3294 debug_assert!(self.side.is_server());
3295 let len = packet.header_data.len() + packet.payload.len();
3296 let path_id = PathId::ZERO;
3297 self.path_data_mut(path_id).total_recvd = len as u64;
3298
3299 if let Some(hs) = self.state.as_handshake_mut() {
3300 hs.expected_token = packet.header.token.clone();
3301 } else {
3302 unreachable!("first packet must be delivered in Handshake state");
3303 }
3304
3305 self.on_packet_authenticated(
3307 now,
3308 SpaceId::Initial,
3309 path_id,
3310 ecn,
3311 Some(packet_number),
3312 false,
3313 false,
3314 );
3315
3316 let packet: Packet = packet.into();
3317
3318 let mut qlog = QlogRecvPacket::new(len);
3319 qlog.header(&packet.header, Some(packet_number), path_id);
3320
3321 self.process_decrypted_packet(
3322 now,
3323 remote,
3324 path_id,
3325 Some(packet_number),
3326 packet,
3327 &mut qlog,
3328 )?;
3329 self.qlog.emit_packet_received(qlog, now);
3330 if let Some(data) = remaining {
3331 self.handle_coalesced(now, remote, path_id, ecn, data);
3332 }
3333
3334 self.qlog.emit_recovery_metrics(
3335 path_id,
3336 &mut self.paths.get_mut(&path_id).unwrap().data,
3337 now,
3338 );
3339
3340 Ok(())
3341 }
3342
3343 fn init_0rtt(&mut self, now: Instant) {
3344 let (header, packet) = match self.crypto.early_crypto() {
3345 Some(x) => x,
3346 None => return,
3347 };
3348 if self.side.is_client() {
3349 match self.crypto.transport_parameters() {
3350 Ok(params) => {
3351 let params = params
3352 .expect("crypto layer didn't supply transport parameters with ticket");
3353 let params = TransportParameters {
3355 initial_src_cid: None,
3356 original_dst_cid: None,
3357 preferred_address: None,
3358 retry_src_cid: None,
3359 stateless_reset_token: None,
3360 min_ack_delay: None,
3361 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3362 max_ack_delay: TransportParameters::default().max_ack_delay,
3363 initial_max_path_id: None,
3364 ..params
3365 };
3366 self.set_peer_params(params);
3367 self.qlog.emit_peer_transport_params_restored(self, now);
3368 }
3369 Err(e) => {
3370 error!("session ticket has malformed transport parameters: {}", e);
3371 return;
3372 }
3373 }
3374 }
3375 trace!("0-RTT enabled");
3376 self.zero_rtt_enabled = true;
3377 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3378 }
3379
3380 fn read_crypto(
3381 &mut self,
3382 space: SpaceId,
3383 crypto: &frame::Crypto,
3384 payload_len: usize,
3385 ) -> Result<(), TransportError> {
3386 let expected = if !self.state.is_handshake() {
3387 SpaceId::Data
3388 } else if self.highest_space == SpaceId::Initial {
3389 SpaceId::Initial
3390 } else {
3391 SpaceId::Handshake
3394 };
3395 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3399
3400 let end = crypto.offset + crypto.data.len() as u64;
3401 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3402 warn!(
3403 "received new {:?} CRYPTO data when expecting {:?}",
3404 space, expected
3405 );
3406 return Err(TransportError::PROTOCOL_VIOLATION(
3407 "new data at unexpected encryption level",
3408 ));
3409 }
3410
3411 let space = &mut self.spaces[space];
3412 let max = end.saturating_sub(space.crypto_stream.bytes_read());
3413 if max > self.config.crypto_buffer_size as u64 {
3414 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3415 }
3416
3417 space
3418 .crypto_stream
3419 .insert(crypto.offset, crypto.data.clone(), payload_len);
3420 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3421 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3422 if self.crypto.read_handshake(&chunk.bytes)? {
3423 self.events.push_back(Event::HandshakeDataReady);
3424 }
3425 }
3426
3427 Ok(())
3428 }
3429
3430 fn write_crypto(&mut self) {
3431 loop {
3432 let space = self.highest_space;
3433 let mut outgoing = Vec::new();
3434 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3435 match space {
3436 SpaceId::Initial => {
3437 self.upgrade_crypto(SpaceId::Handshake, crypto);
3438 }
3439 SpaceId::Handshake => {
3440 self.upgrade_crypto(SpaceId::Data, crypto);
3441 }
3442 _ => unreachable!("got updated secrets during 1-RTT"),
3443 }
3444 }
3445 if outgoing.is_empty() {
3446 if space == self.highest_space {
3447 break;
3448 } else {
3449 continue;
3451 }
3452 }
3453 let offset = self.spaces[space].crypto_offset;
3454 let outgoing = Bytes::from(outgoing);
3455 if let Some(hs) = self.state.as_handshake_mut() {
3456 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3457 hs.client_hello = Some(outgoing.clone());
3458 }
3459 }
3460 self.spaces[space].crypto_offset += outgoing.len() as u64;
3461 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3462 self.spaces[space].pending.crypto.push_back(frame::Crypto {
3463 offset,
3464 data: outgoing,
3465 });
3466 }
3467 }
3468
3469 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3471 debug_assert!(
3472 self.spaces[space].crypto.is_none(),
3473 "already reached packet space {space:?}"
3474 );
3475 trace!("{:?} keys ready", space);
3476 if space == SpaceId::Data {
3477 self.next_crypto = Some(
3479 self.crypto
3480 .next_1rtt_keys()
3481 .expect("handshake should be complete"),
3482 );
3483 }
3484
3485 self.spaces[space].crypto = Some(crypto);
3486 debug_assert!(space as usize > self.highest_space as usize);
3487 self.highest_space = space;
3488 if space == SpaceId::Data && self.side.is_client() {
3489 self.zero_rtt_crypto = None;
3491 }
3492 }
3493
3494 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3495 debug_assert!(space_id != SpaceId::Data);
3496 trace!("discarding {:?} keys", space_id);
3497 if space_id == SpaceId::Initial {
3498 if let ConnectionSide::Client { token, .. } = &mut self.side {
3500 *token = Bytes::new();
3501 }
3502 }
3503 let space = &mut self.spaces[space_id];
3504 space.crypto = None;
3505 let pns = space.for_path(PathId::ZERO);
3506 pns.time_of_last_ack_eliciting_packet = None;
3507 pns.loss_time = None;
3508 pns.loss_probes = 0;
3509 let sent_packets = mem::take(&mut pns.sent_packets);
3510 let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3511 for (_, packet) in sent_packets.into_iter() {
3512 path.data.remove_in_flight(&packet);
3513 }
3514
3515 self.set_loss_detection_timer(now, PathId::ZERO)
3516 }
3517
3518 fn handle_coalesced(
3519 &mut self,
3520 now: Instant,
3521 remote: SocketAddr,
3522 path_id: PathId,
3523 ecn: Option<EcnCodepoint>,
3524 data: BytesMut,
3525 ) {
3526 self.path_data_mut(path_id)
3527 .inc_total_recvd(data.len() as u64);
3528 let mut remaining = Some(data);
3529 let cid_len = self
3530 .local_cid_state
3531 .values()
3532 .map(|cid_state| cid_state.cid_len())
3533 .next()
3534 .expect("one cid_state must exist");
3535 while let Some(data) = remaining {
3536 match PartialDecode::new(
3537 data,
3538 &FixedLengthConnectionIdParser::new(cid_len),
3539 &[self.version],
3540 self.endpoint_config.grease_quic_bit,
3541 ) {
3542 Ok((partial_decode, rest)) => {
3543 remaining = rest;
3544 self.handle_decode(now, remote, path_id, ecn, partial_decode);
3545 }
3546 Err(e) => {
3547 trace!("malformed header: {}", e);
3548 return;
3549 }
3550 }
3551 }
3552 }
3553
3554 fn handle_decode(
3555 &mut self,
3556 now: Instant,
3557 remote: SocketAddr,
3558 path_id: PathId,
3559 ecn: Option<EcnCodepoint>,
3560 partial_decode: PartialDecode,
3561 ) {
3562 let qlog = QlogRecvPacket::new(partial_decode.len());
3563 if let Some(decoded) = packet_crypto::unprotect_header(
3564 partial_decode,
3565 &self.spaces,
3566 self.zero_rtt_crypto.as_ref(),
3567 self.peer_params.stateless_reset_token,
3568 ) {
3569 self.handle_packet(
3570 now,
3571 remote,
3572 path_id,
3573 ecn,
3574 decoded.packet,
3575 decoded.stateless_reset,
3576 qlog,
3577 );
3578 }
3579 }
3580
3581 fn handle_packet(
3582 &mut self,
3583 now: Instant,
3584 remote: SocketAddr,
3585 path_id: PathId,
3586 ecn: Option<EcnCodepoint>,
3587 packet: Option<Packet>,
3588 stateless_reset: bool,
3589 mut qlog: QlogRecvPacket,
3590 ) {
3591 self.stats.udp_rx.ios += 1;
3592 if let Some(ref packet) = packet {
3593 trace!(
3594 "got {:?} packet ({} bytes) from {} using id {}",
3595 packet.header.space(),
3596 packet.payload.len() + packet.header_data.len(),
3597 remote,
3598 packet.header.dst_cid(),
3599 );
3600 }
3601
3602 if self.is_handshaking() {
3603 if path_id != PathId::ZERO {
3604 debug!(%remote, %path_id, "discarding multipath packet during handshake");
3605 return;
3606 }
3607 if remote != self.path_data_mut(path_id).remote {
3608 if let Some(hs) = self.state.as_handshake() {
3609 if hs.allow_server_migration {
3610 trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote");
3611 self.path_data_mut(path_id).remote = remote;
3612 self.qlog.emit_tuple_assigned(path_id, remote, now);
3613 } else {
3614 debug!("discarding packet with unexpected remote during handshake");
3615 return;
3616 }
3617 } else {
3618 debug!("discarding packet with unexpected remote during handshake");
3619 return;
3620 }
3621 }
3622 }
3623
3624 let was_closed = self.state.is_closed();
3625 let was_drained = self.state.is_drained();
3626
3627 let decrypted = match packet {
3628 None => Err(None),
3629 Some(mut packet) => self
3630 .decrypt_packet(now, path_id, &mut packet)
3631 .map(move |number| (packet, number)),
3632 };
3633 let result = match decrypted {
3634 _ if stateless_reset => {
3635 debug!("got stateless reset");
3636 Err(ConnectionError::Reset)
3637 }
3638 Err(Some(e)) => {
3639 warn!("illegal packet: {}", e);
3640 Err(e.into())
3641 }
3642 Err(None) => {
3643 debug!("failed to authenticate packet");
3644 self.authentication_failures += 1;
3645 let integrity_limit = self.spaces[self.highest_space]
3646 .crypto
3647 .as_ref()
3648 .unwrap()
3649 .packet
3650 .local
3651 .integrity_limit();
3652 if self.authentication_failures > integrity_limit {
3653 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3654 } else {
3655 return;
3656 }
3657 }
3658 Ok((packet, number)) => {
3659 qlog.header(&packet.header, number, path_id);
3660 let span = match number {
3661 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3662 None => trace_span!("recv", space = ?packet.header.space()),
3663 };
3664 let _guard = span.enter();
3665
3666 let dedup = self.spaces[packet.header.space()]
3667 .path_space_mut(path_id)
3668 .map(|pns| &mut pns.dedup);
3669 if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3670 debug!("discarding possible duplicate packet");
3671 self.qlog.emit_packet_received(qlog, now);
3672 return;
3673 } else if self.state.is_handshake() && packet.header.is_short() {
3674 trace!("dropping short packet during handshake");
3676 self.qlog.emit_packet_received(qlog, now);
3677 return;
3678 } else {
3679 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3680 if let Some(hs) = self.state.as_handshake() {
3681 if self.side.is_server() && token != &hs.expected_token {
3682 warn!("discarding Initial with invalid retry token");
3686 self.qlog.emit_packet_received(qlog, now);
3687 return;
3688 }
3689 }
3690 }
3691
3692 if !self.state.is_closed() {
3693 let spin = match packet.header {
3694 Header::Short { spin, .. } => spin,
3695 _ => false,
3696 };
3697
3698 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3699 self.ensure_path(path_id, remote, now, number);
3701 }
3702 if self.paths.contains_key(&path_id) {
3703 self.on_packet_authenticated(
3704 now,
3705 packet.header.space(),
3706 path_id,
3707 ecn,
3708 number,
3709 spin,
3710 packet.header.is_1rtt(),
3711 );
3712 }
3713 }
3714
3715 let res = self
3716 .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog);
3717
3718 self.qlog.emit_packet_received(qlog, now);
3719 res
3720 }
3721 }
3722 };
3723
3724 if let Err(conn_err) = result {
3726 match conn_err {
3727 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3728 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3729 ConnectionError::Reset
3730 | ConnectionError::TransportError(TransportError {
3731 code: TransportErrorCode::AEAD_LIMIT_REACHED,
3732 ..
3733 }) => {
3734 self.state.move_to_drained(Some(conn_err));
3735 }
3736 ConnectionError::TimedOut => {
3737 unreachable!("timeouts aren't generated by packet processing");
3738 }
3739 ConnectionError::TransportError(err) => {
3740 debug!("closing connection due to transport error: {}", err);
3741 self.state.move_to_closed(err);
3742 }
3743 ConnectionError::VersionMismatch => {
3744 self.state.move_to_draining(Some(conn_err));
3745 }
3746 ConnectionError::LocallyClosed => {
3747 unreachable!("LocallyClosed isn't generated by packet processing");
3748 }
3749 ConnectionError::CidsExhausted => {
3750 unreachable!("CidsExhausted isn't generated by packet processing");
3751 }
3752 };
3753 }
3754
3755 if !was_closed && self.state.is_closed() {
3756 self.close_common();
3757 if !self.state.is_drained() {
3758 self.set_close_timer(now);
3759 }
3760 }
3761 if !was_drained && self.state.is_drained() {
3762 self.endpoint_events.push_back(EndpointEventInner::Drained);
3763 self.timers
3766 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3767 }
3768
3769 if matches!(self.state.as_type(), StateType::Closed) {
3771 let path_remote = self
3775 .paths
3776 .get(&path_id)
3777 .map(|p| p.data.remote)
3778 .unwrap_or(remote);
3779 self.close = remote == path_remote;
3780 }
3781 }
3782
3783 fn process_decrypted_packet(
3784 &mut self,
3785 now: Instant,
3786 remote: SocketAddr,
3787 path_id: PathId,
3788 number: Option<u64>,
3789 packet: Packet,
3790 qlog: &mut QlogRecvPacket,
3791 ) -> Result<(), ConnectionError> {
3792 if !self.paths.contains_key(&path_id) {
3793 trace!(%path_id, ?number, "discarding packet for unknown path");
3797 return Ok(());
3798 }
3799 let state = match self.state.as_type() {
3800 StateType::Established => {
3801 match packet.header.space() {
3802 SpaceId::Data => {
3803 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?
3804 }
3805 _ if packet.header.has_frames() => {
3806 self.process_early_payload(now, path_id, packet, qlog)?
3807 }
3808 _ => {
3809 trace!("discarding unexpected pre-handshake packet");
3810 }
3811 }
3812 return Ok(());
3813 }
3814 StateType::Closed => {
3815 for result in frame::Iter::new(packet.payload.freeze())? {
3816 let frame = match result {
3817 Ok(frame) => frame,
3818 Err(err) => {
3819 debug!("frame decoding error: {err:?}");
3820 continue;
3821 }
3822 };
3823 qlog.frame(&frame);
3824
3825 if let Frame::Padding = frame {
3826 continue;
3827 };
3828
3829 self.stats.frame_rx.record(&frame);
3830
3831 if let Frame::Close(_error) = frame {
3832 self.state.move_to_draining(None);
3833 break;
3834 }
3835 }
3836 return Ok(());
3837 }
3838 StateType::Draining | StateType::Drained => return Ok(()),
3839 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3840 };
3841
3842 match packet.header {
3843 Header::Retry {
3844 src_cid: rem_cid, ..
3845 } => {
3846 debug_assert_eq!(path_id, PathId::ZERO);
3847 if self.side.is_server() {
3848 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3849 }
3850
3851 let is_valid_retry = self
3852 .rem_cids
3853 .get(&path_id)
3854 .map(|cids| cids.active())
3855 .map(|orig_dst_cid| {
3856 self.crypto.is_valid_retry(
3857 orig_dst_cid,
3858 &packet.header_data,
3859 &packet.payload,
3860 )
3861 })
3862 .unwrap_or_default();
3863 if self.total_authed_packets > 1
3864 || packet.payload.len() <= 16 || !is_valid_retry
3866 {
3867 trace!("discarding invalid Retry");
3868 return Ok(());
3876 }
3877
3878 trace!("retrying with CID {}", rem_cid);
3879 let client_hello = state.client_hello.take().unwrap();
3880 self.retry_src_cid = Some(rem_cid);
3881 self.rem_cids
3882 .get_mut(&path_id)
3883 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3884 .update_initial_cid(rem_cid);
3885 self.rem_handshake_cid = rem_cid;
3886
3887 let space = &mut self.spaces[SpaceId::Initial];
3888 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3889 self.on_packet_acked(now, PathId::ZERO, info);
3890 };
3891
3892 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = {
3895 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3896 space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3897 space.crypto_offset = client_hello.len() as u64;
3898 space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3899 .for_path(path_id)
3900 .next_packet_number;
3901 space.pending.crypto.push_back(frame::Crypto {
3902 offset: 0,
3903 data: client_hello,
3904 });
3905 space
3906 };
3907
3908 let zero_rtt = mem::take(
3910 &mut self.spaces[SpaceId::Data]
3911 .for_path(PathId::ZERO)
3912 .sent_packets,
3913 );
3914 for (_, info) in zero_rtt.into_iter() {
3915 self.paths
3916 .get_mut(&PathId::ZERO)
3917 .unwrap()
3918 .remove_in_flight(&info);
3919 self.spaces[SpaceId::Data].pending |= info.retransmits;
3920 }
3921 self.streams.retransmit_all_for_0rtt();
3922
3923 let token_len = packet.payload.len() - 16;
3924 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3925 unreachable!("we already short-circuited if we're server");
3926 };
3927 *token = packet.payload.freeze().split_to(token_len);
3928
3929 self.state = State::handshake(state::Handshake {
3930 expected_token: Bytes::new(),
3931 rem_cid_set: false,
3932 client_hello: None,
3933 allow_server_migration: true,
3934 });
3935 Ok(())
3936 }
3937 Header::Long {
3938 ty: LongType::Handshake,
3939 src_cid: rem_cid,
3940 dst_cid: loc_cid,
3941 ..
3942 } => {
3943 debug_assert_eq!(path_id, PathId::ZERO);
3944 if rem_cid != self.rem_handshake_cid {
3945 debug!(
3946 "discarding packet with mismatched remote CID: {} != {}",
3947 self.rem_handshake_cid, rem_cid
3948 );
3949 return Ok(());
3950 }
3951 self.on_path_validated(path_id);
3952
3953 self.process_early_payload(now, path_id, packet, qlog)?;
3954 if self.state.is_closed() {
3955 return Ok(());
3956 }
3957
3958 if self.crypto.is_handshaking() {
3959 trace!("handshake ongoing");
3960 return Ok(());
3961 }
3962
3963 if self.side.is_client() {
3964 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3966 TransportError::new(
3967 TransportErrorCode::crypto(0x6d),
3968 "transport parameters missing".to_owned(),
3969 )
3970 })?;
3971
3972 if self.has_0rtt() {
3973 if !self.crypto.early_data_accepted().unwrap() {
3974 debug_assert!(self.side.is_client());
3975 debug!("0-RTT rejected");
3976 self.accepted_0rtt = false;
3977 self.streams.zero_rtt_rejected();
3978
3979 self.spaces[SpaceId::Data].pending = Retransmits::default();
3981
3982 let sent_packets = mem::take(
3984 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3985 );
3986 for (_, packet) in sent_packets.into_iter() {
3987 self.paths
3988 .get_mut(&path_id)
3989 .unwrap()
3990 .remove_in_flight(&packet);
3991 }
3992 } else {
3993 self.accepted_0rtt = true;
3994 params.validate_resumption_from(&self.peer_params)?;
3995 }
3996 }
3997 if let Some(token) = params.stateless_reset_token {
3998 let remote = self.path_data(path_id).remote;
3999 self.endpoint_events
4000 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4001 }
4002 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4003 self.issue_first_cids(now);
4004 } else {
4005 self.spaces[SpaceId::Data].pending.handshake_done = true;
4007 self.discard_space(now, SpaceId::Handshake);
4008 self.events.push_back(Event::HandshakeConfirmed);
4009 trace!("handshake confirmed");
4010 }
4011
4012 self.events.push_back(Event::Connected);
4013 self.state.move_to_established();
4014 trace!("established");
4015
4016 self.issue_first_path_cids(now);
4019 Ok(())
4020 }
4021 Header::Initial(InitialHeader {
4022 src_cid: rem_cid,
4023 dst_cid: loc_cid,
4024 ..
4025 }) => {
4026 debug_assert_eq!(path_id, PathId::ZERO);
4027 if !state.rem_cid_set {
4028 trace!("switching remote CID to {}", rem_cid);
4029 let mut state = state.clone();
4030 self.rem_cids
4031 .get_mut(&path_id)
4032 .expect("PathId::ZERO not yet abandoned")
4033 .update_initial_cid(rem_cid);
4034 self.rem_handshake_cid = rem_cid;
4035 self.orig_rem_cid = rem_cid;
4036 state.rem_cid_set = true;
4037 self.state.move_to_handshake(state);
4038 } else if rem_cid != self.rem_handshake_cid {
4039 debug!(
4040 "discarding packet with mismatched remote CID: {} != {}",
4041 self.rem_handshake_cid, rem_cid
4042 );
4043 return Ok(());
4044 }
4045
4046 let starting_space = self.highest_space;
4047 self.process_early_payload(now, path_id, packet, qlog)?;
4048
4049 if self.side.is_server()
4050 && starting_space == SpaceId::Initial
4051 && self.highest_space != SpaceId::Initial
4052 {
4053 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4054 TransportError::new(
4055 TransportErrorCode::crypto(0x6d),
4056 "transport parameters missing".to_owned(),
4057 )
4058 })?;
4059 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4060 self.issue_first_cids(now);
4061 self.init_0rtt(now);
4062 }
4063 Ok(())
4064 }
4065 Header::Long {
4066 ty: LongType::ZeroRtt,
4067 ..
4068 } => {
4069 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?;
4070 Ok(())
4071 }
4072 Header::VersionNegotiate { .. } => {
4073 if self.total_authed_packets > 1 {
4074 return Ok(());
4075 }
4076 let supported = packet
4077 .payload
4078 .chunks(4)
4079 .any(|x| match <[u8; 4]>::try_from(x) {
4080 Ok(version) => self.version == u32::from_be_bytes(version),
4081 Err(_) => false,
4082 });
4083 if supported {
4084 return Ok(());
4085 }
4086 debug!("remote doesn't support our version");
4087 Err(ConnectionError::VersionMismatch)
4088 }
4089 Header::Short { .. } => unreachable!(
4090 "short packets received during handshake are discarded in handle_packet"
4091 ),
4092 }
4093 }
4094
4095 fn process_early_payload(
4097 &mut self,
4098 now: Instant,
4099 path_id: PathId,
4100 packet: Packet,
4101 #[allow(unused)] qlog: &mut QlogRecvPacket,
4102 ) -> Result<(), TransportError> {
4103 debug_assert_ne!(packet.header.space(), SpaceId::Data);
4104 debug_assert_eq!(path_id, PathId::ZERO);
4105 let payload_len = packet.payload.len();
4106 let mut ack_eliciting = false;
4107 for result in frame::Iter::new(packet.payload.freeze())? {
4108 let frame = result?;
4109 qlog.frame(&frame);
4110 let span = match frame {
4111 Frame::Padding => continue,
4112 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4113 };
4114
4115 self.stats.frame_rx.record(&frame);
4116
4117 let _guard = span.as_ref().map(|x| x.enter());
4118 ack_eliciting |= frame.is_ack_eliciting();
4119
4120 if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4122 return Err(TransportError::PROTOCOL_VIOLATION(
4123 "illegal frame type in handshake",
4124 ));
4125 }
4126
4127 match frame {
4128 Frame::Padding | Frame::Ping => {}
4129 Frame::Crypto(frame) => {
4130 self.read_crypto(packet.header.space(), &frame, payload_len)?;
4131 }
4132 Frame::Ack(ack) => {
4133 self.on_ack_received(now, packet.header.space(), ack)?;
4134 }
4135 Frame::PathAck(ack) => {
4136 span.as_ref()
4137 .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4138 self.on_path_ack_received(now, packet.header.space(), ack)?;
4139 }
4140 Frame::Close(reason) => {
4141 self.state.move_to_draining(Some(reason.into()));
4142 return Ok(());
4143 }
4144 _ => {
4145 let mut err =
4146 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4147 err.frame = frame::MaybeFrame::Known(frame.ty());
4148 return Err(err);
4149 }
4150 }
4151 }
4152
4153 if ack_eliciting {
4154 self.spaces[packet.header.space()]
4156 .for_path(path_id)
4157 .pending_acks
4158 .set_immediate_ack_required();
4159 }
4160
4161 self.write_crypto();
4162 Ok(())
4163 }
4164
4165 fn process_payload(
4167 &mut self,
4168 now: Instant,
4169 remote: SocketAddr,
4170 path_id: PathId,
4171 number: u64,
4172 packet: Packet,
4173 #[allow(unused)] qlog: &mut QlogRecvPacket,
4174 ) -> Result<(), TransportError> {
4175 let payload = packet.payload.freeze();
4176 let mut is_probing_packet = true;
4177 let mut close = None;
4178 let payload_len = payload.len();
4179 let mut ack_eliciting = false;
4180 let mut migration_observed_addr = None;
4183 for result in frame::Iter::new(payload)? {
4184 let frame = result?;
4185 qlog.frame(&frame);
4186 let span = match frame {
4187 Frame::Padding => continue,
4188 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4189 };
4190
4191 self.stats.frame_rx.record(&frame);
4192 match &frame {
4195 Frame::Crypto(f) => {
4196 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4197 }
4198 Frame::Stream(f) => {
4199 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4200 }
4201 Frame::Datagram(f) => {
4202 trace!(len = f.data.len(), "got datagram frame");
4203 }
4204 f => {
4205 trace!("got frame {f}");
4206 }
4207 }
4208
4209 let _guard = span.enter();
4210 if packet.header.is_0rtt() {
4211 match frame {
4212 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4213 return Err(TransportError::PROTOCOL_VIOLATION(
4214 "illegal frame type in 0-RTT",
4215 ));
4216 }
4217 _ => {
4218 if frame.is_1rtt() {
4219 return Err(TransportError::PROTOCOL_VIOLATION(
4220 "illegal frame type in 0-RTT",
4221 ));
4222 }
4223 }
4224 }
4225 }
4226 ack_eliciting |= frame.is_ack_eliciting();
4227
4228 match frame {
4230 Frame::Padding
4231 | Frame::PathChallenge(_)
4232 | Frame::PathResponse(_)
4233 | Frame::NewConnectionId(_)
4234 | Frame::ObservedAddr(_) => {}
4235 _ => {
4236 is_probing_packet = false;
4237 }
4238 }
4239
4240 match frame {
4241 Frame::Crypto(frame) => {
4242 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4243 }
4244 Frame::Stream(frame) => {
4245 if self.streams.received(frame, payload_len)?.should_transmit() {
4246 self.spaces[SpaceId::Data].pending.max_data = true;
4247 }
4248 }
4249 Frame::Ack(ack) => {
4250 self.on_ack_received(now, SpaceId::Data, ack)?;
4251 }
4252 Frame::PathAck(ack) => {
4253 span.record("path", tracing::field::debug(&ack.path_id));
4254 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4255 }
4256 Frame::Padding | Frame::Ping => {}
4257 Frame::Close(reason) => {
4258 close = Some(reason);
4259 }
4260 Frame::PathChallenge(challenge) => {
4261 let path = &mut self
4262 .path_mut(path_id)
4263 .expect("payload is processed only after the path becomes known");
4264 path.path_responses.push(number, challenge.0, remote);
4265 if remote == path.remote {
4266 match self.peer_supports_ack_frequency() {
4276 true => self.immediate_ack(path_id),
4277 false => {
4278 self.ping_path(path_id).ok();
4279 }
4280 }
4281 }
4282 }
4283 Frame::PathResponse(response) => {
4284 let path = self
4285 .paths
4286 .get_mut(&path_id)
4287 .expect("payload is processed only after the path becomes known");
4288
4289 use PathTimer::*;
4290 use paths::OnPathResponseReceived::*;
4291 match path.data.on_path_response_received(now, response.0, remote) {
4292 OnPath { was_open } => {
4293 let qlog = self.qlog.with_time(now);
4294
4295 self.timers
4296 .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4297 self.timers
4298 .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4299
4300 let next_challenge = path
4301 .data
4302 .earliest_expiring_challenge()
4303 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4304 self.timers.set_or_stop(
4305 Timer::PerPath(path_id, PathChallengeLost),
4306 next_challenge,
4307 qlog,
4308 );
4309
4310 if !was_open {
4311 self.events
4312 .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4313 if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4314 {
4315 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4316 id: path_id,
4317 addr: observed.socket_addr(),
4318 }));
4319 }
4320 }
4321 if let Some((_, ref mut prev)) = path.prev {
4322 prev.challenges_sent.clear();
4323 prev.send_new_challenge = false;
4324 }
4325 }
4326 OffPath => {
4327 debug!("Response to off-path PathChallenge!");
4328 let next_challenge = path
4329 .data
4330 .earliest_expiring_challenge()
4331 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4332 self.timers.set_or_stop(
4333 Timer::PerPath(path_id, PathChallengeLost),
4334 next_challenge,
4335 self.qlog.with_time(now),
4336 );
4337 }
4338 Invalid { expected } => {
4339 debug!(%response, from=%remote, %expected, "ignoring invalid PATH_RESPONSE")
4340 }
4341 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4342 }
4343 }
4344 Frame::MaxData(bytes) => {
4345 self.streams.received_max_data(bytes);
4346 }
4347 Frame::MaxStreamData { id, offset } => {
4348 self.streams.received_max_stream_data(id, offset)?;
4349 }
4350 Frame::MaxStreams { dir, count } => {
4351 self.streams.received_max_streams(dir, count)?;
4352 }
4353 Frame::ResetStream(frame) => {
4354 if self.streams.received_reset(frame)?.should_transmit() {
4355 self.spaces[SpaceId::Data].pending.max_data = true;
4356 }
4357 }
4358 Frame::DataBlocked { offset } => {
4359 debug!(offset, "peer claims to be blocked at connection level");
4360 }
4361 Frame::StreamDataBlocked { id, offset } => {
4362 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4363 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4364 return Err(TransportError::STREAM_STATE_ERROR(
4365 "STREAM_DATA_BLOCKED on send-only stream",
4366 ));
4367 }
4368 debug!(
4369 stream = %id,
4370 offset, "peer claims to be blocked at stream level"
4371 );
4372 }
4373 Frame::StreamsBlocked { dir, limit } => {
4374 if limit > MAX_STREAM_COUNT {
4375 return Err(TransportError::FRAME_ENCODING_ERROR(
4376 "unrepresentable stream limit",
4377 ));
4378 }
4379 debug!(
4380 "peer claims to be blocked opening more than {} {} streams",
4381 limit, dir
4382 );
4383 }
4384 Frame::StopSending(frame::StopSending { id, error_code }) => {
4385 if id.initiator() != self.side.side() {
4386 if id.dir() == Dir::Uni {
4387 debug!("got STOP_SENDING on recv-only {}", id);
4388 return Err(TransportError::STREAM_STATE_ERROR(
4389 "STOP_SENDING on recv-only stream",
4390 ));
4391 }
4392 } else if self.streams.is_local_unopened(id) {
4393 return Err(TransportError::STREAM_STATE_ERROR(
4394 "STOP_SENDING on unopened stream",
4395 ));
4396 }
4397 self.streams.received_stop_sending(id, error_code);
4398 }
4399 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4400 if let Some(ref path_id) = path_id {
4401 span.record("path", tracing::field::debug(&path_id));
4402 }
4403 let path_id = path_id.unwrap_or_default();
4404 match self.local_cid_state.get_mut(&path_id) {
4405 None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4406 Some(cid_state) => {
4407 let allow_more_cids = cid_state
4408 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4409
4410 let has_path = !self.abandoned_paths.contains(&path_id);
4414 let allow_more_cids = allow_more_cids && has_path;
4415
4416 self.endpoint_events
4417 .push_back(EndpointEventInner::RetireConnectionId(
4418 now,
4419 path_id,
4420 sequence,
4421 allow_more_cids,
4422 ));
4423 }
4424 }
4425 }
4426 Frame::NewConnectionId(frame) => {
4427 let path_id = if let Some(path_id) = frame.path_id {
4428 if !self.is_multipath_negotiated() {
4429 return Err(TransportError::PROTOCOL_VIOLATION(
4430 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4431 ));
4432 }
4433 if path_id > self.local_max_path_id {
4434 return Err(TransportError::PROTOCOL_VIOLATION(
4435 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4436 ));
4437 }
4438 path_id
4439 } else {
4440 PathId::ZERO
4441 };
4442
4443 if self.abandoned_paths.contains(&path_id) {
4444 trace!("ignoring issued CID for abandoned path");
4445 continue;
4446 }
4447 if let Some(ref path_id) = frame.path_id {
4448 span.record("path", tracing::field::debug(&path_id));
4449 }
4450 let rem_cids = self
4451 .rem_cids
4452 .entry(path_id)
4453 .or_insert_with(|| CidQueue::new(frame.id));
4454 if rem_cids.active().is_empty() {
4455 return Err(TransportError::PROTOCOL_VIOLATION(
4456 "NEW_CONNECTION_ID when CIDs aren't in use",
4457 ));
4458 }
4459 if frame.retire_prior_to > frame.sequence {
4460 return Err(TransportError::PROTOCOL_VIOLATION(
4461 "NEW_CONNECTION_ID retiring unissued CIDs",
4462 ));
4463 }
4464
4465 use crate::cid_queue::InsertError;
4466 match rem_cids.insert(frame) {
4467 Ok(None) if self.path(path_id).is_none() => {
4468 self.continue_nat_traversal_round(now);
4471 }
4472 Ok(None) => {}
4473 Ok(Some((retired, reset_token))) => {
4474 let pending_retired =
4475 &mut self.spaces[SpaceId::Data].pending.retire_cids;
4476 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4479 if (pending_retired.len() as u64)
4482 .saturating_add(retired.end.saturating_sub(retired.start))
4483 > MAX_PENDING_RETIRED_CIDS
4484 {
4485 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4486 "queued too many retired CIDs",
4487 ));
4488 }
4489 pending_retired.extend(retired.map(|seq| (path_id, seq)));
4490 self.set_reset_token(path_id, remote, reset_token);
4491 }
4492 Err(InsertError::ExceedsLimit) => {
4493 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4494 }
4495 Err(InsertError::Retired) => {
4496 trace!("discarding already-retired");
4497 self.spaces[SpaceId::Data]
4501 .pending
4502 .retire_cids
4503 .push((path_id, frame.sequence));
4504 continue;
4505 }
4506 };
4507
4508 if self.side.is_server()
4509 && path_id == PathId::ZERO
4510 && self
4511 .rem_cids
4512 .get(&PathId::ZERO)
4513 .map(|cids| cids.active_seq() == 0)
4514 .unwrap_or_default()
4515 {
4516 self.update_rem_cid(PathId::ZERO);
4519 }
4520 }
4521 Frame::NewToken(NewToken { token }) => {
4522 let ConnectionSide::Client {
4523 token_store,
4524 server_name,
4525 ..
4526 } = &self.side
4527 else {
4528 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4529 };
4530 if token.is_empty() {
4531 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4532 }
4533 trace!("got new token");
4534 token_store.insert(server_name, token);
4535 }
4536 Frame::Datagram(datagram) => {
4537 if self
4538 .datagrams
4539 .received(datagram, &self.config.datagram_receive_buffer_size)?
4540 {
4541 self.events.push_back(Event::DatagramReceived);
4542 }
4543 }
4544 Frame::AckFrequency(ack_frequency) => {
4545 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4548 continue;
4551 }
4552
4553 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4555 space.pending_acks.set_ack_frequency_params(&ack_frequency);
4556
4557 if let Some(timeout) = space
4560 .pending_acks
4561 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4562 {
4563 self.timers.set(
4564 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4565 timeout,
4566 self.qlog.with_time(now),
4567 );
4568 }
4569 }
4570 }
4571 Frame::ImmediateAck => {
4572 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4574 pns.pending_acks.set_immediate_ack_required();
4575 }
4576 }
4577 Frame::HandshakeDone => {
4578 if self.side.is_server() {
4579 return Err(TransportError::PROTOCOL_VIOLATION(
4580 "client sent HANDSHAKE_DONE",
4581 ));
4582 }
4583 if self.spaces[SpaceId::Handshake].crypto.is_some() {
4584 self.discard_space(now, SpaceId::Handshake);
4585 }
4586 self.events.push_back(Event::HandshakeConfirmed);
4587 trace!("handshake confirmed");
4588 }
4589 Frame::ObservedAddr(observed) => {
4590 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4592 if !self
4593 .peer_params
4594 .address_discovery_role
4595 .should_report(&self.config.address_discovery_role)
4596 {
4597 return Err(TransportError::PROTOCOL_VIOLATION(
4598 "received OBSERVED_ADDRESS frame when not negotiated",
4599 ));
4600 }
4601 if packet.header.space() != SpaceId::Data {
4603 return Err(TransportError::PROTOCOL_VIOLATION(
4604 "OBSERVED_ADDRESS frame outside data space",
4605 ));
4606 }
4607
4608 let path = self.path_data_mut(path_id);
4609 if remote == path.remote {
4610 if let Some(updated) = path.update_observed_addr_report(observed) {
4611 if path.open {
4612 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4613 id: path_id,
4614 addr: updated,
4615 }));
4616 }
4617 }
4619 } else {
4620 migration_observed_addr = Some(observed)
4622 }
4623 }
4624 Frame::PathAbandon(frame::PathAbandon {
4625 path_id,
4626 error_code,
4627 }) => {
4628 span.record("path", tracing::field::debug(&path_id));
4629 let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4631 Ok(()) => {
4632 trace!("peer abandoned path");
4633 false
4634 }
4635 Err(ClosePathError::LastOpenPath) => {
4636 trace!("peer abandoned last path, closing connection");
4637 return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4639 }
4640 Err(ClosePathError::ClosedPath) => {
4641 trace!("peer abandoned already closed path");
4642 true
4643 }
4644 };
4645 if self.path(path_id).is_some() && !already_abandoned {
4650 let delay = self.pto(SpaceId::Data, path_id) * 3;
4655 self.timers.set(
4656 Timer::PerPath(path_id, PathTimer::DiscardPath),
4657 now + delay,
4658 self.qlog.with_time(now),
4659 );
4660 }
4661 }
4662 Frame::PathStatusAvailable(info) => {
4663 span.record("path", tracing::field::debug(&info.path_id));
4664 if self.is_multipath_negotiated() {
4665 self.on_path_status(
4666 info.path_id,
4667 PathStatus::Available,
4668 info.status_seq_no,
4669 );
4670 } else {
4671 return Err(TransportError::PROTOCOL_VIOLATION(
4672 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4673 ));
4674 }
4675 }
4676 Frame::PathStatusBackup(info) => {
4677 span.record("path", tracing::field::debug(&info.path_id));
4678 if self.is_multipath_negotiated() {
4679 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4680 } else {
4681 return Err(TransportError::PROTOCOL_VIOLATION(
4682 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4683 ));
4684 }
4685 }
4686 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4687 span.record("path", tracing::field::debug(&path_id));
4688 if !self.is_multipath_negotiated() {
4689 return Err(TransportError::PROTOCOL_VIOLATION(
4690 "received MAX_PATH_ID frame when multipath was not negotiated",
4691 ));
4692 }
4693 if path_id > self.remote_max_path_id {
4695 self.remote_max_path_id = path_id;
4696 self.issue_first_path_cids(now);
4697 while let Some(true) = self.continue_nat_traversal_round(now) {}
4698 }
4699 }
4700 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4701 if self.is_multipath_negotiated() {
4705 if self.local_max_path_id > max_path_id {
4706 return Err(TransportError::PROTOCOL_VIOLATION(
4707 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4708 ));
4709 }
4710 debug!("received PATHS_BLOCKED({:?})", max_path_id);
4711 } else {
4713 return Err(TransportError::PROTOCOL_VIOLATION(
4714 "received PATHS_BLOCKED frame when not multipath was not negotiated",
4715 ));
4716 }
4717 }
4718 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4719 if self.is_multipath_negotiated() {
4727 if path_id > self.local_max_path_id {
4728 return Err(TransportError::PROTOCOL_VIOLATION(
4729 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4730 ));
4731 }
4732 if next_seq.0
4733 > self
4734 .local_cid_state
4735 .get(&path_id)
4736 .map(|cid_state| cid_state.active_seq().1 + 1)
4737 .unwrap_or_default()
4738 {
4739 return Err(TransportError::PROTOCOL_VIOLATION(
4740 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4741 ));
4742 }
4743 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4744 } else {
4745 return Err(TransportError::PROTOCOL_VIOLATION(
4746 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4747 ));
4748 }
4749 }
4750 Frame::AddAddress(addr) => {
4751 let client_state = match self.iroh_hp.client_side_mut() {
4752 Ok(state) => state,
4753 Err(err) => {
4754 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4755 "Nat traversal(ADD_ADDRESS): {err}"
4756 )));
4757 }
4758 };
4759
4760 if !client_state.check_remote_address(&addr) {
4761 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4763 }
4764
4765 match client_state.add_remote_address(addr) {
4766 Ok(maybe_added) => {
4767 if let Some(added) = maybe_added {
4768 self.events.push_back(Event::NatTraversal(
4769 iroh_hp::Event::AddressAdded(added),
4770 ));
4771 }
4772 }
4773 Err(e) => {
4774 warn!(%e, "failed to add remote address")
4775 }
4776 }
4777 }
4778 Frame::RemoveAddress(addr) => {
4779 let client_state = match self.iroh_hp.client_side_mut() {
4780 Ok(state) => state,
4781 Err(err) => {
4782 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4783 "Nat traversal(REMOVE_ADDRESS): {err}"
4784 )));
4785 }
4786 };
4787 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4788 self.events
4789 .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4790 removed_addr,
4791 )));
4792 }
4793 }
4794 Frame::ReachOut(reach_out) => {
4795 let server_state = match self.iroh_hp.server_side_mut() {
4796 Ok(state) => state,
4797 Err(err) => {
4798 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4799 "Nat traversal(REACH_OUT): {err}"
4800 )));
4801 }
4802 };
4803
4804 if let Err(err) = server_state.handle_reach_out(reach_out) {
4805 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4806 "Nat traversal(REACH_OUT): {err}"
4807 )));
4808 }
4809 }
4810 }
4811 }
4812
4813 let space = self.spaces[SpaceId::Data].for_path(path_id);
4814 if space
4815 .pending_acks
4816 .packet_received(now, number, ack_eliciting, &space.dedup)
4817 {
4818 if self.abandoned_paths.contains(&path_id) {
4819 space.pending_acks.set_immediate_ack_required();
4822 } else {
4823 self.timers.set(
4824 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4825 now + self.ack_frequency.max_ack_delay,
4826 self.qlog.with_time(now),
4827 );
4828 }
4829 }
4830
4831 let pending = &mut self.spaces[SpaceId::Data].pending;
4836 self.streams.queue_max_stream_id(pending);
4837
4838 if let Some(reason) = close {
4839 self.state.move_to_draining(Some(reason.into()));
4840 self.close = true;
4841 }
4842
4843 if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4844 && !is_probing_packet
4845 && remote != self.path_data(path_id).remote
4846 {
4847 let ConnectionSide::Server { ref server_config } = self.side else {
4848 panic!("packets from unknown remote should be dropped by clients");
4849 };
4850 debug_assert!(
4851 server_config.migration,
4852 "migration-initiating packets should have been dropped immediately"
4853 );
4854 self.migrate(path_id, now, remote, migration_observed_addr);
4855 self.update_rem_cid(path_id);
4857 self.spin = false;
4858 }
4859
4860 Ok(())
4861 }
4862
4863 fn migrate(
4864 &mut self,
4865 path_id: PathId,
4866 now: Instant,
4867 remote: SocketAddr,
4868 observed_addr: Option<ObservedAddr>,
4869 ) {
4870 trace!(%remote, %path_id, "migration initiated");
4871 self.path_counter = self.path_counter.wrapping_add(1);
4872 let prev_pto = self.pto(SpaceId::Data, path_id);
4879 let known_path = self.paths.get_mut(&path_id).expect("known path");
4880 let path = &mut known_path.data;
4881 let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() {
4882 PathData::from_previous(remote, path, self.path_counter, now)
4883 } else {
4884 let peer_max_udp_payload_size =
4885 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4886 .unwrap_or(u16::MAX);
4887 PathData::new(
4888 remote,
4889 self.allow_mtud,
4890 Some(peer_max_udp_payload_size),
4891 self.path_counter,
4892 now,
4893 &self.config,
4894 )
4895 };
4896 new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4897 if let Some(report) = observed_addr {
4898 if let Some(updated) = new_path.update_observed_addr_report(report) {
4899 tracing::info!("adding observed addr event from migration");
4900 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4901 id: path_id,
4902 addr: updated,
4903 }));
4904 }
4905 }
4906 new_path.send_new_challenge = true;
4907
4908 let mut prev = mem::replace(path, new_path);
4909 if !prev.is_validating_path() {
4911 prev.send_new_challenge = true;
4912 known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4916 }
4917
4918 self.qlog.emit_tuple_assigned(path_id, remote, now);
4920
4921 self.timers.set(
4922 Timer::PerPath(path_id, PathTimer::PathValidation),
4923 now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4924 self.qlog.with_time(now),
4925 );
4926 }
4927
4928 pub fn local_address_changed(&mut self) {
4930 self.update_rem_cid(PathId::ZERO);
4932 self.ping();
4933 }
4934
4935 fn update_rem_cid(&mut self, path_id: PathId) {
4937 let Some((reset_token, retired)) =
4938 self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4939 else {
4940 return;
4941 };
4942
4943 self.spaces[SpaceId::Data]
4945 .pending
4946 .retire_cids
4947 .extend(retired.map(|seq| (path_id, seq)));
4948 let remote = self.path_data(path_id).remote;
4949 self.set_reset_token(path_id, remote, reset_token);
4950 }
4951
4952 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4961 self.endpoint_events
4962 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4963
4964 if path_id == PathId::ZERO {
4970 self.peer_params.stateless_reset_token = Some(reset_token);
4971 }
4972 }
4973
4974 fn issue_first_cids(&mut self, now: Instant) {
4976 if self
4977 .local_cid_state
4978 .get(&PathId::ZERO)
4979 .expect("PathId::ZERO exists when the connection is created")
4980 .cid_len()
4981 == 0
4982 {
4983 return;
4984 }
4985
4986 let mut n = self.peer_params.issue_cids_limit() - 1;
4988 if let ConnectionSide::Server { server_config } = &self.side {
4989 if server_config.has_preferred_address() {
4990 n -= 1;
4992 }
4993 }
4994 self.endpoint_events
4995 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
4996 }
4997
4998 fn issue_first_path_cids(&mut self, now: Instant) {
5002 if let Some(max_path_id) = self.max_path_id() {
5003 let mut path_id = self.max_path_id_with_cids.next();
5004 while path_id <= max_path_id {
5005 self.endpoint_events
5006 .push_back(EndpointEventInner::NeedIdentifiers(
5007 path_id,
5008 now,
5009 self.peer_params.issue_cids_limit(),
5010 ));
5011 path_id = path_id.next();
5012 }
5013 self.max_path_id_with_cids = max_path_id;
5014 }
5015 }
5016
5017 fn populate_packet(
5025 &mut self,
5026 now: Instant,
5027 space_id: SpaceId,
5028 path_id: PathId,
5029 path_exclusive_only: bool,
5030 buf: &mut impl BufMut,
5031 pn: u64,
5032 #[allow(unused)] qlog: &mut QlogSentPacket,
5033 ) -> SentFrames {
5034 let mut sent = SentFrames::default();
5035 let is_multipath_negotiated = self.is_multipath_negotiated();
5036 let space = &mut self.spaces[space_id];
5037 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5038 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5039 space
5040 .for_path(path_id)
5041 .pending_acks
5042 .maybe_ack_non_eliciting();
5043
5044 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5046 trace!("HANDSHAKE_DONE");
5047 buf.write(frame::FrameType::HandshakeDone);
5048 qlog.frame(&Frame::HandshakeDone);
5049 sent.retransmits.get_or_create().handshake_done = true;
5050 self.stats.frame_tx.handshake_done =
5052 self.stats.frame_tx.handshake_done.saturating_add(1);
5053 }
5054
5055 if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5058 while let Some(local_addr) = addresses.pop() {
5059 let reach_out = frame::ReachOut::new(*round, local_addr);
5060 if buf.remaining_mut() > reach_out.size() {
5061 trace!(%round, ?local_addr, "REACH_OUT");
5062 reach_out.encode(buf);
5063 let sent_reachouts = sent
5064 .retransmits
5065 .get_or_create()
5066 .reach_out
5067 .get_or_insert_with(|| (*round, Default::default()));
5068 sent_reachouts.1.push(local_addr);
5069 self.stats.frame_tx.reach_out = self.stats.frame_tx.reach_out.saturating_add(1);
5070 qlog.frame(&Frame::ReachOut(reach_out));
5071 } else {
5072 addresses.push(local_addr);
5073 break;
5074 }
5075 }
5076 if addresses.is_empty() {
5077 space.pending.reach_out = None;
5078 }
5079 }
5080
5081 if !path_exclusive_only
5083 && space_id == SpaceId::Data
5084 && self
5085 .config
5086 .address_discovery_role
5087 .should_report(&self.peer_params.address_discovery_role)
5088 && (!path.observed_addr_sent || space.pending.observed_addr)
5089 {
5090 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5091 if buf.remaining_mut() > frame.size() {
5092 trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5093 frame.encode(buf);
5094
5095 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5096 path.observed_addr_sent = true;
5097
5098 self.stats.frame_tx.observed_addr += 1;
5099 sent.retransmits.get_or_create().observed_addr = true;
5100 space.pending.observed_addr = false;
5101 qlog.frame(&Frame::ObservedAddr(frame));
5102 }
5103 }
5104
5105 if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5107 trace!("PING");
5108 buf.write(frame::FrameType::Ping);
5109 sent.non_retransmits = true;
5110 self.stats.frame_tx.ping += 1;
5111 qlog.frame(&Frame::Ping);
5112 }
5113
5114 if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5116 debug_assert_eq!(
5117 space_id,
5118 SpaceId::Data,
5119 "immediate acks must be sent in the data space"
5120 );
5121 trace!("IMMEDIATE_ACK");
5122 buf.write(frame::FrameType::ImmediateAck);
5123 sent.non_retransmits = true;
5124 self.stats.frame_tx.immediate_ack += 1;
5125 qlog.frame(&Frame::ImmediateAck);
5126 }
5127
5128 if !path_exclusive_only {
5132 for path_id in space
5133 .number_spaces
5134 .iter_mut()
5135 .filter(|(_, pns)| pns.pending_acks.can_send())
5136 .map(|(&path_id, _)| path_id)
5137 .collect::<Vec<_>>()
5138 {
5139 Self::populate_acks(
5140 now,
5141 self.receiving_ecn,
5142 &mut sent,
5143 path_id,
5144 space_id,
5145 space,
5146 is_multipath_negotiated,
5147 buf,
5148 &mut self.stats,
5149 qlog,
5150 );
5151 }
5152 }
5153
5154 if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5156 let sequence_number = self.ack_frequency.next_sequence_number();
5157
5158 let config = self.config.ack_frequency_config.as_ref().unwrap();
5160
5161 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5163 path.rtt.get(),
5164 config,
5165 &self.peer_params,
5166 );
5167
5168 trace!(?max_ack_delay, "ACK_FREQUENCY");
5169
5170 let frame = frame::AckFrequency {
5171 sequence: sequence_number,
5172 ack_eliciting_threshold: config.ack_eliciting_threshold,
5173 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5174 reordering_threshold: config.reordering_threshold,
5175 };
5176 frame.encode(buf);
5177 qlog.frame(&Frame::AckFrequency(frame));
5178
5179 sent.retransmits.get_or_create().ack_frequency = true;
5180
5181 self.ack_frequency
5182 .ack_frequency_sent(path_id, pn, max_ack_delay);
5183 self.stats.frame_tx.ack_frequency += 1;
5184 }
5185
5186 if buf.remaining_mut() > frame::PathChallenge::SIZE_BOUND
5188 && space_id == SpaceId::Data
5189 && path.send_new_challenge
5190 && !self.state.is_closed()
5191 {
5193 path.send_new_challenge = false;
5194
5195 let token = self.rng.random();
5197 let info = paths::SentChallengeInfo {
5198 sent_instant: now,
5199 remote: path.remote,
5200 };
5201 path.challenges_sent.insert(token, info);
5202 sent.non_retransmits = true;
5203 sent.requires_padding = true;
5204 let challenge = frame::PathChallenge(token);
5205 trace!(frame = %challenge);
5206 buf.write(challenge);
5207 qlog.frame(&Frame::PathChallenge(challenge));
5208 self.stats.frame_tx.path_challenge += 1;
5209 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5210 self.timers.set(
5211 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5212 now + pto,
5213 self.qlog.with_time(now),
5214 );
5215
5216 if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5217 space.pending.path_status.insert(path_id);
5219 }
5220
5221 if space_id == SpaceId::Data
5224 && self
5225 .config
5226 .address_discovery_role
5227 .should_report(&self.peer_params.address_discovery_role)
5228 {
5229 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5230 if buf.remaining_mut() > frame.size() {
5231 frame.encode(buf);
5232 qlog.frame(&Frame::ObservedAddr(frame));
5233
5234 self.next_observed_addr_seq_no =
5235 self.next_observed_addr_seq_no.saturating_add(1u8);
5236 path.observed_addr_sent = true;
5237
5238 self.stats.frame_tx.observed_addr += 1;
5239 sent.retransmits.get_or_create().observed_addr = true;
5240 space.pending.observed_addr = false;
5241 }
5242 }
5243 }
5244
5245 if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data {
5247 if let Some(token) = path.path_responses.pop_on_path(path.remote) {
5248 sent.non_retransmits = true;
5249 sent.requires_padding = true;
5250 let response = frame::PathResponse(token);
5251 trace!(frame = %response);
5252 buf.write(response);
5253 qlog.frame(&Frame::PathResponse(response));
5254 self.stats.frame_tx.path_response += 1;
5255
5256 if space_id == SpaceId::Data
5260 && self
5261 .config
5262 .address_discovery_role
5263 .should_report(&self.peer_params.address_discovery_role)
5264 {
5265 let frame =
5266 frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5267 if buf.remaining_mut() > frame.size() {
5268 frame.encode(buf);
5269 qlog.frame(&Frame::ObservedAddr(frame));
5270
5271 self.next_observed_addr_seq_no =
5272 self.next_observed_addr_seq_no.saturating_add(1u8);
5273 path.observed_addr_sent = true;
5274
5275 self.stats.frame_tx.observed_addr += 1;
5276 sent.retransmits.get_or_create().observed_addr = true;
5277 space.pending.observed_addr = false;
5278 }
5279 }
5280 }
5281 }
5282
5283 while !path_exclusive_only && buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt {
5285 let mut frame = match space.pending.crypto.pop_front() {
5286 Some(x) => x,
5287 None => break,
5288 };
5289
5290 let max_crypto_data_size = buf.remaining_mut()
5295 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5297 - 2; let len = frame
5300 .data
5301 .len()
5302 .min(2usize.pow(14) - 1)
5303 .min(max_crypto_data_size);
5304
5305 let data = frame.data.split_to(len);
5306 let truncated = frame::Crypto {
5307 offset: frame.offset,
5308 data,
5309 };
5310 trace!(
5311 "CRYPTO: off {} len {}",
5312 truncated.offset,
5313 truncated.data.len()
5314 );
5315 truncated.encode(buf);
5316 self.stats.frame_tx.crypto += 1;
5317
5318 #[cfg(feature = "qlog")]
5320 qlog.frame(&Frame::Crypto(truncated.clone()));
5321 sent.retransmits.get_or_create().crypto.push_back(truncated);
5322 if !frame.data.is_empty() {
5323 frame.offset += len as u64;
5324 space.pending.crypto.push_front(frame);
5325 }
5326 }
5327
5328 while !path_exclusive_only
5331 && space_id == SpaceId::Data
5332 && frame::PathAbandon::SIZE_BOUND <= buf.remaining_mut()
5333 {
5334 let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5335 break;
5336 };
5337 let frame = frame::PathAbandon {
5338 path_id,
5339 error_code,
5340 };
5341 frame.encode(buf);
5342 qlog.frame(&Frame::PathAbandon(frame));
5343 self.stats.frame_tx.path_abandon += 1;
5344 trace!(%path_id, "PATH_ABANDON");
5345 sent.retransmits
5346 .get_or_create()
5347 .path_abandon
5348 .entry(path_id)
5349 .or_insert(error_code);
5350 }
5351
5352 while !path_exclusive_only
5354 && space_id == SpaceId::Data
5355 && frame::PathStatusAvailable::SIZE_BOUND <= buf.remaining_mut()
5356 {
5357 let Some(path_id) = space.pending.path_status.pop_first() else {
5358 break;
5359 };
5360 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5361 trace!(%path_id, "discarding queued path status for unknown path");
5362 continue;
5363 };
5364
5365 let seq = path.status.seq();
5366 sent.retransmits.get_or_create().path_status.insert(path_id);
5367 match path.local_status() {
5368 PathStatus::Available => {
5369 let frame = frame::PathStatusAvailable {
5370 path_id,
5371 status_seq_no: seq,
5372 };
5373 frame.encode(buf);
5374 qlog.frame(&Frame::PathStatusAvailable(frame));
5375 self.stats.frame_tx.path_status_available += 1;
5376 trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5377 }
5378 PathStatus::Backup => {
5379 let frame = frame::PathStatusBackup {
5380 path_id,
5381 status_seq_no: seq,
5382 };
5383 frame.encode(buf);
5384 qlog.frame(&Frame::PathStatusBackup(frame));
5385 self.stats.frame_tx.path_status_backup += 1;
5386 trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5387 }
5388 }
5389 }
5390
5391 if space_id == SpaceId::Data
5393 && space.pending.max_path_id
5394 && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut()
5395 {
5396 let frame = frame::MaxPathId(self.local_max_path_id);
5397 frame.encode(buf);
5398 qlog.frame(&Frame::MaxPathId(frame));
5399 space.pending.max_path_id = false;
5400 sent.retransmits.get_or_create().max_path_id = true;
5401 trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5402 self.stats.frame_tx.max_path_id += 1;
5403 }
5404
5405 if space_id == SpaceId::Data
5407 && space.pending.paths_blocked
5408 && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut()
5409 {
5410 let frame = frame::PathsBlocked(self.remote_max_path_id);
5411 frame.encode(buf);
5412 qlog.frame(&Frame::PathsBlocked(frame));
5413 space.pending.paths_blocked = false;
5414 sent.retransmits.get_or_create().paths_blocked = true;
5415 trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED");
5416 self.stats.frame_tx.paths_blocked += 1;
5417 }
5418
5419 while space_id == SpaceId::Data && frame::PathCidsBlocked::SIZE_BOUND <= buf.remaining_mut()
5421 {
5422 let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5423 break;
5424 };
5425 let next_seq = match self.rem_cids.get(&path_id) {
5426 Some(cid_queue) => cid_queue.active_seq() + 1,
5427 None => 0,
5428 };
5429 let frame = frame::PathCidsBlocked {
5430 path_id,
5431 next_seq: VarInt(next_seq),
5432 };
5433 frame.encode(buf);
5434 qlog.frame(&Frame::PathCidsBlocked(frame));
5435 sent.retransmits
5436 .get_or_create()
5437 .path_cids_blocked
5438 .push(path_id);
5439 trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5440 self.stats.frame_tx.path_cids_blocked += 1;
5441 }
5442
5443 if space_id == SpaceId::Data {
5445 self.streams.write_control_frames(
5446 buf,
5447 &mut space.pending,
5448 &mut sent.retransmits,
5449 &mut self.stats.frame_tx,
5450 qlog,
5451 );
5452 }
5453
5454 let cid_len = self
5456 .local_cid_state
5457 .values()
5458 .map(|cid_state| cid_state.cid_len())
5459 .max()
5460 .expect("some local CID state must exist");
5461 let new_cid_size_bound =
5462 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5463 while !path_exclusive_only && buf.remaining_mut() > new_cid_size_bound {
5464 let issued = match space.pending.new_cids.pop() {
5465 Some(x) => x,
5466 None => break,
5467 };
5468 let retire_prior_to = self
5469 .local_cid_state
5470 .get(&issued.path_id)
5471 .map(|cid_state| cid_state.retire_prior_to())
5472 .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5473
5474 let cid_path_id = match is_multipath_negotiated {
5475 true => {
5476 trace!(
5477 path_id = ?issued.path_id,
5478 sequence = issued.sequence,
5479 id = %issued.id,
5480 "PATH_NEW_CONNECTION_ID",
5481 );
5482 self.stats.frame_tx.path_new_connection_id += 1;
5483 Some(issued.path_id)
5484 }
5485 false => {
5486 trace!(
5487 sequence = issued.sequence,
5488 id = %issued.id,
5489 "NEW_CONNECTION_ID"
5490 );
5491 debug_assert_eq!(issued.path_id, PathId::ZERO);
5492 self.stats.frame_tx.new_connection_id += 1;
5493 None
5494 }
5495 };
5496 let frame = frame::NewConnectionId {
5497 path_id: cid_path_id,
5498 sequence: issued.sequence,
5499 retire_prior_to,
5500 id: issued.id,
5501 reset_token: issued.reset_token,
5502 };
5503 frame.encode(buf);
5504 sent.retransmits.get_or_create().new_cids.push(issued);
5505 qlog.frame(&Frame::NewConnectionId(frame));
5506 }
5507
5508 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5510 while !path_exclusive_only && buf.remaining_mut() > retire_cid_bound {
5511 let (path_id, sequence) = match space.pending.retire_cids.pop() {
5512 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5513 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5514 self.stats.frame_tx.retire_connection_id += 1;
5515 (None, seq)
5516 }
5517 Some((path_id, seq)) => {
5518 trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5519 self.stats.frame_tx.path_retire_connection_id += 1;
5520 (Some(path_id), seq)
5521 }
5522 None => break,
5523 };
5524 let frame = frame::RetireConnectionId { path_id, sequence };
5525 frame.encode(buf);
5526 qlog.frame(&Frame::RetireConnectionId(frame));
5527 sent.retransmits
5528 .get_or_create()
5529 .retire_cids
5530 .push((path_id.unwrap_or_default(), sequence));
5531 }
5532
5533 let mut sent_datagrams = false;
5535 while !path_exclusive_only
5536 && buf.remaining_mut() > Datagram::SIZE_BOUND
5537 && space_id == SpaceId::Data
5538 {
5539 let prev_remaining = buf.remaining_mut();
5540 match self.datagrams.write(buf) {
5541 true => {
5542 sent_datagrams = true;
5543 sent.non_retransmits = true;
5544 self.stats.frame_tx.datagram += 1;
5545 qlog.frame_datagram((prev_remaining - buf.remaining_mut()) as u64);
5546 }
5547 false => break,
5548 }
5549 }
5550 if self.datagrams.send_blocked && sent_datagrams {
5551 self.events.push_back(Event::DatagramsUnblocked);
5552 self.datagrams.send_blocked = false;
5553 }
5554
5555 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5556
5557 while let Some(remote_addr) = space.pending.new_tokens.pop() {
5559 if path_exclusive_only {
5560 break;
5561 }
5562 debug_assert_eq!(space_id, SpaceId::Data);
5563 let ConnectionSide::Server { server_config } = &self.side else {
5564 panic!("NEW_TOKEN frames should not be enqueued by clients");
5565 };
5566
5567 if remote_addr != path.remote {
5568 continue;
5573 }
5574
5575 let token = Token::new(
5576 TokenPayload::Validation {
5577 ip: remote_addr.ip(),
5578 issued: server_config.time_source.now(),
5579 },
5580 &mut self.rng,
5581 );
5582 let new_token = NewToken {
5583 token: token.encode(&*server_config.token_key).into(),
5584 };
5585
5586 if buf.remaining_mut() < new_token.size() {
5587 space.pending.new_tokens.push(remote_addr);
5588 break;
5589 }
5590
5591 trace!("NEW_TOKEN");
5592 new_token.encode(buf);
5593 qlog.frame(&Frame::NewToken(new_token));
5594 sent.retransmits
5595 .get_or_create()
5596 .new_tokens
5597 .push(remote_addr);
5598 self.stats.frame_tx.new_token += 1;
5599 }
5600
5601 if !path_exclusive_only && space_id == SpaceId::Data {
5603 sent.stream_frames =
5604 self.streams
5605 .write_stream_frames(buf, self.config.send_fairness, qlog);
5606 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
5607 }
5608
5609 while space_id == SpaceId::Data && frame::AddAddress::SIZE_BOUND <= buf.remaining_mut() {
5612 if let Some(added_address) = space.pending.add_address.pop_last() {
5613 trace!(
5614 seq = %added_address.seq_no,
5615 ip = ?added_address.ip,
5616 port = added_address.port,
5617 "ADD_ADDRESS",
5618 );
5619 added_address.encode(buf);
5620 sent.retransmits
5621 .get_or_create()
5622 .add_address
5623 .insert(added_address);
5624 self.stats.frame_tx.add_address = self.stats.frame_tx.add_address.saturating_add(1);
5625 qlog.frame(&Frame::AddAddress(added_address));
5626 } else {
5627 break;
5628 }
5629 }
5630
5631 while space_id == SpaceId::Data && frame::RemoveAddress::SIZE_BOUND <= buf.remaining_mut() {
5633 if let Some(removed_address) = space.pending.remove_address.pop_last() {
5634 trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5635 removed_address.encode(buf);
5636 sent.retransmits
5637 .get_or_create()
5638 .remove_address
5639 .insert(removed_address);
5640 self.stats.frame_tx.remove_address =
5641 self.stats.frame_tx.remove_address.saturating_add(1);
5642 qlog.frame(&Frame::RemoveAddress(removed_address));
5643 } else {
5644 break;
5645 }
5646 }
5647
5648 sent
5649 }
5650
5651 fn populate_acks(
5653 now: Instant,
5654 receiving_ecn: bool,
5655 sent: &mut SentFrames,
5656 path_id: PathId,
5657 space_id: SpaceId,
5658 space: &mut PacketSpace,
5659 is_multipath_negotiated: bool,
5660 buf: &mut impl BufMut,
5661 stats: &mut ConnectionStats,
5662 #[allow(unused)] qlog: &mut QlogSentPacket,
5663 ) {
5664 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5666
5667 debug_assert!(
5668 is_multipath_negotiated || path_id == PathId::ZERO,
5669 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5670 );
5671 if is_multipath_negotiated {
5672 debug_assert!(
5673 space_id == SpaceId::Data || path_id == PathId::ZERO,
5674 "path acks must be sent in 1RTT space (have {space_id:?})"
5675 );
5676 }
5677
5678 let pns = space.for_path(path_id);
5679 let ranges = pns.pending_acks.ranges();
5680 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5681 let ecn = if receiving_ecn {
5682 Some(&pns.ecn_counters)
5683 } else {
5684 None
5685 };
5686 if let Some(max) = ranges.max() {
5687 sent.largest_acked.insert(path_id, max);
5688 }
5689
5690 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5691 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5693 let delay = delay_micros >> ack_delay_exp.into_inner();
5694
5695 if is_multipath_negotiated && space_id == SpaceId::Data {
5696 if !ranges.is_empty() {
5697 trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5698 frame::PathAck::encode(path_id, delay as _, ranges, ecn, buf);
5699 qlog.frame_path_ack(path_id, delay as _, ranges, ecn);
5700 stats.frame_tx.path_acks += 1;
5701 }
5702 } else {
5703 trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5704 frame::Ack::encode(delay as _, ranges, ecn, buf);
5705 stats.frame_tx.acks += 1;
5706 qlog.frame_ack(delay, ranges, ecn);
5707 }
5708 }
5709
5710 fn close_common(&mut self) {
5711 trace!("connection closed");
5712 self.timers.reset();
5713 }
5714
5715 fn set_close_timer(&mut self, now: Instant) {
5716 let pto_max = self.pto_max_path(self.highest_space, true);
5719 self.timers.set(
5720 Timer::Conn(ConnTimer::Close),
5721 now + 3 * pto_max,
5722 self.qlog.with_time(now),
5723 );
5724 }
5725
5726 fn handle_peer_params(
5731 &mut self,
5732 params: TransportParameters,
5733 loc_cid: ConnectionId,
5734 rem_cid: ConnectionId,
5735 now: Instant,
5736 ) -> Result<(), TransportError> {
5737 if Some(self.orig_rem_cid) != params.initial_src_cid
5738 || (self.side.is_client()
5739 && (Some(self.initial_dst_cid) != params.original_dst_cid
5740 || self.retry_src_cid != params.retry_src_cid))
5741 {
5742 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5743 "CID authentication failure",
5744 ));
5745 }
5746 if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5747 return Err(TransportError::PROTOCOL_VIOLATION(
5748 "multipath must not use zero-length CIDs",
5749 ));
5750 }
5751
5752 self.set_peer_params(params);
5753 self.qlog.emit_peer_transport_params_received(self, now);
5754
5755 Ok(())
5756 }
5757
5758 fn set_peer_params(&mut self, params: TransportParameters) {
5759 self.streams.set_params(¶ms);
5760 self.idle_timeout =
5761 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5762 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5763
5764 if let Some(ref info) = params.preferred_address {
5765 self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5767 path_id: None,
5768 sequence: 1,
5769 id: info.connection_id,
5770 reset_token: info.stateless_reset_token,
5771 retire_prior_to: 0,
5772 })
5773 .expect(
5774 "preferred address CID is the first received, and hence is guaranteed to be legal",
5775 );
5776 let remote = self.path_data(PathId::ZERO).remote;
5777 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5778 }
5779 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
5780
5781 let mut multipath_enabled = None;
5782 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5783 self.config.get_initial_max_path_id(),
5784 params.initial_max_path_id,
5785 ) {
5786 self.local_max_path_id = local_max_path_id;
5788 self.remote_max_path_id = remote_max_path_id;
5789 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5790 debug!(%initial_max_path_id, "multipath negotiated");
5791 multipath_enabled = Some(initial_max_path_id);
5792 }
5793
5794 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5795 self.config
5796 .max_remote_nat_traversal_addresses
5797 .zip(params.max_remote_nat_traversal_addresses)
5798 {
5799 if let Some(max_initial_paths) =
5800 multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5801 {
5802 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5803 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5804 self.iroh_hp =
5805 iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5806 debug!(
5807 %max_remote_addresses, %max_local_addresses,
5808 "iroh hole punching negotiated"
5809 );
5810
5811 match self.side() {
5812 Side::Client => {
5813 if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5814 warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5817 } else if max_local_addresses as u64
5818 > params.active_connection_id_limit.into_inner()
5819 {
5820 warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5824 }
5825 }
5826 Side::Server => {
5827 if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5828 warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5829 }
5830 }
5831 }
5832 } else {
5833 debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5834 }
5835 }
5836
5837 self.peer_params = params;
5838 let peer_max_udp_payload_size =
5839 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5840 self.path_data_mut(PathId::ZERO)
5841 .mtud
5842 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5843 }
5844
5845 fn decrypt_packet(
5847 &mut self,
5848 now: Instant,
5849 path_id: PathId,
5850 packet: &mut Packet,
5851 ) -> Result<Option<u64>, Option<TransportError>> {
5852 let result = packet_crypto::decrypt_packet_body(
5853 packet,
5854 path_id,
5855 &self.spaces,
5856 self.zero_rtt_crypto.as_ref(),
5857 self.key_phase,
5858 self.prev_crypto.as_ref(),
5859 self.next_crypto.as_ref(),
5860 )?;
5861
5862 let result = match result {
5863 Some(r) => r,
5864 None => return Ok(None),
5865 };
5866
5867 if result.outgoing_key_update_acked {
5868 if let Some(prev) = self.prev_crypto.as_mut() {
5869 prev.end_packet = Some((result.number, now));
5870 self.set_key_discard_timer(now, packet.header.space());
5871 }
5872 }
5873
5874 if result.incoming_key_update {
5875 trace!("key update authenticated");
5876 self.update_keys(Some((result.number, now)), true);
5877 self.set_key_discard_timer(now, packet.header.space());
5878 }
5879
5880 Ok(Some(result.number))
5881 }
5882
5883 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5884 trace!("executing key update");
5885 let new = self
5889 .crypto
5890 .next_1rtt_keys()
5891 .expect("only called for `Data` packets");
5892 self.key_phase_size = new
5893 .local
5894 .confidentiality_limit()
5895 .saturating_sub(KEY_UPDATE_MARGIN);
5896 let old = mem::replace(
5897 &mut self.spaces[SpaceId::Data]
5898 .crypto
5899 .as_mut()
5900 .unwrap() .packet,
5902 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5903 );
5904 self.spaces[SpaceId::Data]
5905 .iter_paths_mut()
5906 .for_each(|s| s.sent_with_keys = 0);
5907 self.prev_crypto = Some(PrevCrypto {
5908 crypto: old,
5909 end_packet,
5910 update_unacked: remote,
5911 });
5912 self.key_phase = !self.key_phase;
5913 }
5914
5915 fn peer_supports_ack_frequency(&self) -> bool {
5916 self.peer_params.min_ack_delay.is_some()
5917 }
5918
5919 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5924 debug_assert_eq!(
5925 self.highest_space,
5926 SpaceId::Data,
5927 "immediate ack must be written in the data space"
5928 );
5929 self.spaces[self.highest_space]
5930 .for_path(path_id)
5931 .immediate_ack_pending = true;
5932 }
5933
5934 #[cfg(test)]
5936 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5937 let (path_id, first_decode, remaining) = match &event.0 {
5938 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5939 path_id,
5940 first_decode,
5941 remaining,
5942 ..
5943 }) => (path_id, first_decode, remaining),
5944 _ => return None,
5945 };
5946
5947 if remaining.is_some() {
5948 panic!("Packets should never be coalesced in tests");
5949 }
5950
5951 let decrypted_header = packet_crypto::unprotect_header(
5952 first_decode.clone(),
5953 &self.spaces,
5954 self.zero_rtt_crypto.as_ref(),
5955 self.peer_params.stateless_reset_token,
5956 )?;
5957
5958 let mut packet = decrypted_header.packet?;
5959 packet_crypto::decrypt_packet_body(
5960 &mut packet,
5961 *path_id,
5962 &self.spaces,
5963 self.zero_rtt_crypto.as_ref(),
5964 self.key_phase,
5965 self.prev_crypto.as_ref(),
5966 self.next_crypto.as_ref(),
5967 )
5968 .ok()?;
5969
5970 Some(packet.payload.to_vec())
5971 }
5972
5973 #[cfg(test)]
5976 pub(crate) fn bytes_in_flight(&self) -> u64 {
5977 self.path_data(PathId::ZERO).in_flight.bytes
5979 }
5980
5981 #[cfg(test)]
5983 pub(crate) fn congestion_window(&self) -> u64 {
5984 let path = self.path_data(PathId::ZERO);
5985 path.congestion
5986 .window()
5987 .saturating_sub(path.in_flight.bytes)
5988 }
5989
5990 #[cfg(test)]
5992 pub(crate) fn is_idle(&self) -> bool {
5993 let current_timers = self.timers.values();
5994 current_timers
5995 .into_iter()
5996 .filter(|(timer, _)| {
5997 !matches!(
5998 timer,
5999 Timer::Conn(ConnTimer::KeepAlive)
6000 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6001 | Timer::Conn(ConnTimer::PushNewCid)
6002 | Timer::Conn(ConnTimer::KeyDiscard)
6003 )
6004 })
6005 .min_by_key(|(_, time)| *time)
6006 .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6007 }
6008
6009 #[cfg(test)]
6011 pub(crate) fn using_ecn(&self) -> bool {
6012 self.path_data(PathId::ZERO).sending_ecn
6013 }
6014
6015 #[cfg(test)]
6017 pub(crate) fn total_recvd(&self) -> u64 {
6018 self.path_data(PathId::ZERO).total_recvd
6019 }
6020
6021 #[cfg(test)]
6022 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6023 self.local_cid_state
6024 .get(&PathId::ZERO)
6025 .unwrap()
6026 .active_seq()
6027 }
6028
6029 #[cfg(test)]
6030 #[track_caller]
6031 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6032 self.local_cid_state
6033 .get(&PathId(path_id))
6034 .unwrap()
6035 .active_seq()
6036 }
6037
6038 #[cfg(test)]
6041 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6042 let n = self
6043 .local_cid_state
6044 .get_mut(&PathId::ZERO)
6045 .unwrap()
6046 .assign_retire_seq(v);
6047 self.endpoint_events
6048 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6049 }
6050
6051 #[cfg(test)]
6053 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
6054 self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
6055 }
6056
6057 #[cfg(test)]
6059 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6060 self.path_data(path_id).current_mtu()
6061 }
6062
6063 #[cfg(test)]
6065 pub(crate) fn trigger_path_validation(&mut self) {
6066 for path in self.paths.values_mut() {
6067 path.data.send_new_challenge = true;
6068 }
6069 }
6070
6071 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6082 let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
6083 path.data.send_new_challenge
6084 || path
6085 .prev
6086 .as_ref()
6087 .is_some_and(|(_, path)| path.send_new_challenge)
6088 || !path.data.path_responses.is_empty()
6089 });
6090 let other = self.streams.can_send_stream_data()
6091 || self
6092 .datagrams
6093 .outgoing
6094 .front()
6095 .is_some_and(|x| x.size(true) <= max_size);
6096 SendableFrames {
6097 acks: false,
6098 other,
6099 close: false,
6100 path_exclusive,
6101 }
6102 }
6103
6104 fn kill(&mut self, reason: ConnectionError) {
6106 self.close_common();
6107 self.state.move_to_drained(Some(reason));
6108 self.endpoint_events.push_back(EndpointEventInner::Drained);
6109 }
6110
6111 pub fn current_mtu(&self) -> u16 {
6118 self.paths
6119 .iter()
6120 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6121 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6122 .min()
6123 .expect("There is always at least one available path")
6124 }
6125
6126 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6133 let pn_len = PacketNumber::new(
6134 pn,
6135 self.spaces[SpaceId::Data]
6136 .for_path(path)
6137 .largest_acked_packet
6138 .unwrap_or(0),
6139 )
6140 .len();
6141
6142 1 + self
6144 .rem_cids
6145 .get(&path)
6146 .map(|cids| cids.active().len())
6147 .unwrap_or(20) + pn_len
6149 + self.tag_len_1rtt()
6150 }
6151
6152 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6153 let pn_len = 4;
6154
6155 let cid_len = self
6156 .rem_cids
6157 .values()
6158 .map(|cids| cids.active().len())
6159 .max()
6160 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
6164 }
6165
6166 fn tag_len_1rtt(&self) -> usize {
6167 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6168 Some(crypto) => Some(&*crypto.packet.local),
6169 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6170 };
6171 key.map_or(16, |x| x.tag_len())
6175 }
6176
6177 fn on_path_validated(&mut self, path_id: PathId) {
6179 self.path_data_mut(path_id).validated = true;
6180 let ConnectionSide::Server { server_config } = &self.side else {
6181 return;
6182 };
6183 let remote_addr = self.path_data(path_id).remote;
6184 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6185 new_tokens.clear();
6186 for _ in 0..server_config.validation_token.sent {
6187 new_tokens.push(remote_addr);
6188 }
6189 }
6190
6191 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6193 if let Some(path) = self.paths.get_mut(&path_id) {
6194 path.data.status.remote_update(status, status_seq_no);
6195 } else {
6196 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6197 }
6198 self.events.push_back(
6199 PathEvent::RemoteStatus {
6200 id: path_id,
6201 status,
6202 }
6203 .into(),
6204 );
6205 }
6206
6207 fn max_path_id(&self) -> Option<PathId> {
6216 if self.is_multipath_negotiated() {
6217 Some(self.remote_max_path_id.min(self.local_max_path_id))
6218 } else {
6219 None
6220 }
6221 }
6222
6223 pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6225 if let Some(added) = self.iroh_hp.add_local_address(address)? {
6226 self.spaces[SpaceId::Data].pending.add_address.insert(added);
6227 };
6228 Ok(())
6229 }
6230
6231 pub fn remove_nat_traversal_address(
6235 &mut self,
6236 address: SocketAddr,
6237 ) -> Result<(), iroh_hp::Error> {
6238 if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6239 self.spaces[SpaceId::Data]
6240 .pending
6241 .remove_address
6242 .insert(removed);
6243 }
6244 Ok(())
6245 }
6246
6247 pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6249 self.iroh_hp.get_local_nat_traversal_addresses()
6250 }
6251
6252 pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6254 Ok(self
6255 .iroh_hp
6256 .client_side()?
6257 .get_remote_nat_traversal_addresses())
6258 }
6259
6260 fn open_nat_traversal_path(
6268 &mut self,
6269 now: Instant,
6270 (ip, port): (IpAddr, u16),
6271 ipv6: bool,
6272 ) -> Result<Option<(PathId, SocketAddr, bool)>, PathError> {
6273 let remote = match ip {
6275 IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6276 IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6277 IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6278 IpAddr::V6(_) => {
6279 trace!("not using IPv6 nat candidate for IPv4 socket");
6280 return Ok(None);
6281 }
6282 };
6283 match self.open_path_ensure(remote, PathStatus::Backup, now) {
6284 Ok((path_id, path_was_known)) => {
6285 if path_was_known {
6286 trace!(%path_id, %remote, "nat traversal: path existed for remote");
6287 }
6288 Ok(Some((path_id, remote, path_was_known)))
6289 }
6290 Err(e) => {
6291 debug!(%remote, %e, "nat traversal: failed to probe remote");
6292 Err(e)
6293 }
6294 }
6295 }
6296
6297 pub fn initiate_nat_traversal_round(
6307 &mut self,
6308 now: Instant,
6309 ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6310 if self.state.is_closed() {
6311 return Err(iroh_hp::Error::Closed);
6312 }
6313
6314 let client_state = self.iroh_hp.client_side_mut()?;
6315 let iroh_hp::NatTraversalRound {
6316 new_round,
6317 reach_out_at,
6318 addresses_to_probe,
6319 prev_round_path_ids,
6320 } = client_state.initiate_nat_traversal_round()?;
6321
6322 self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6323
6324 for path_id in prev_round_path_ids {
6325 let validated = self
6328 .path(path_id)
6329 .map(|path| path.validated)
6330 .unwrap_or(false);
6331
6332 if !validated {
6333 let _ = self.close_path(
6334 now,
6335 path_id,
6336 TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6337 );
6338 }
6339 }
6340
6341 let mut err = None;
6342
6343 let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6344 let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6345 let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6346
6347 for (id, address) in addresses_to_probe {
6348 match self.open_nat_traversal_path(now, address, ipv6) {
6349 Ok(None) => {}
6350 Ok(Some((path_id, remote, path_was_known))) => {
6351 if !path_was_known {
6352 path_ids.push(path_id);
6353 probed_addresses.push(remote);
6354 }
6355 }
6356 Err(e) => {
6357 self.iroh_hp
6358 .client_side_mut()
6359 .expect("validated")
6360 .report_in_continuation(id, e);
6361 err.get_or_insert(e);
6362 }
6363 }
6364 }
6365
6366 if let Some(err) = err {
6367 if probed_addresses.is_empty() {
6369 return Err(iroh_hp::Error::Multipath(err));
6370 }
6371 }
6372
6373 self.iroh_hp
6374 .client_side_mut()
6375 .expect("connection side validated")
6376 .set_round_path_ids(path_ids);
6377
6378 Ok(probed_addresses)
6379 }
6380
6381 fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6386 let client_state = self.iroh_hp.client_side_mut().ok()?;
6387 let (id, address) = client_state.continue_nat_traversal_round()?;
6388 let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6389 let open_result = self.open_nat_traversal_path(now, address, ipv6);
6390 let client_state = self.iroh_hp.client_side_mut().expect("validated");
6391 match open_result {
6392 Ok(None) => Some(true),
6393 Ok(Some((path_id, _remote, path_was_known))) => {
6394 if !path_was_known {
6395 client_state.add_round_path_id(path_id);
6396 }
6397 Some(true)
6398 }
6399 Err(e) => {
6400 client_state.report_in_continuation(id, e);
6401 Some(false)
6402 }
6403 }
6404 }
6405}
6406
6407impl fmt::Debug for Connection {
6408 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6409 f.debug_struct("Connection")
6410 .field("handshake_cid", &self.handshake_cid)
6411 .finish()
6412 }
6413}
6414
6415#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6416enum PathBlocked {
6417 No,
6418 AntiAmplification,
6419 Congestion,
6420 Pacing,
6421}
6422
6423enum ConnectionSide {
6425 Client {
6426 token: Bytes,
6428 token_store: Arc<dyn TokenStore>,
6429 server_name: String,
6430 },
6431 Server {
6432 server_config: Arc<ServerConfig>,
6433 },
6434}
6435
6436impl ConnectionSide {
6437 fn remote_may_migrate(&self, state: &State) -> bool {
6438 match self {
6439 Self::Server { server_config } => server_config.migration,
6440 Self::Client { .. } => {
6441 if let Some(hs) = state.as_handshake() {
6442 hs.allow_server_migration
6443 } else {
6444 false
6445 }
6446 }
6447 }
6448 }
6449
6450 fn is_client(&self) -> bool {
6451 self.side().is_client()
6452 }
6453
6454 fn is_server(&self) -> bool {
6455 self.side().is_server()
6456 }
6457
6458 fn side(&self) -> Side {
6459 match *self {
6460 Self::Client { .. } => Side::Client,
6461 Self::Server { .. } => Side::Server,
6462 }
6463 }
6464}
6465
6466impl From<SideArgs> for ConnectionSide {
6467 fn from(side: SideArgs) -> Self {
6468 match side {
6469 SideArgs::Client {
6470 token_store,
6471 server_name,
6472 } => Self::Client {
6473 token: token_store.take(&server_name).unwrap_or_default(),
6474 token_store,
6475 server_name,
6476 },
6477 SideArgs::Server {
6478 server_config,
6479 pref_addr_cid: _,
6480 path_validated: _,
6481 } => Self::Server { server_config },
6482 }
6483 }
6484}
6485
6486pub(crate) enum SideArgs {
6488 Client {
6489 token_store: Arc<dyn TokenStore>,
6490 server_name: String,
6491 },
6492 Server {
6493 server_config: Arc<ServerConfig>,
6494 pref_addr_cid: Option<ConnectionId>,
6495 path_validated: bool,
6496 },
6497}
6498
6499impl SideArgs {
6500 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6501 match *self {
6502 Self::Client { .. } => None,
6503 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6504 }
6505 }
6506
6507 pub(crate) fn path_validated(&self) -> bool {
6508 match *self {
6509 Self::Client { .. } => true,
6510 Self::Server { path_validated, .. } => path_validated,
6511 }
6512 }
6513
6514 pub(crate) fn side(&self) -> Side {
6515 match *self {
6516 Self::Client { .. } => Side::Client,
6517 Self::Server { .. } => Side::Server,
6518 }
6519 }
6520}
6521
6522#[derive(Debug, Error, Clone, PartialEq, Eq)]
6524pub enum ConnectionError {
6525 #[error("peer doesn't implement any supported version")]
6527 VersionMismatch,
6528 #[error(transparent)]
6530 TransportError(#[from] TransportError),
6531 #[error("aborted by peer: {0}")]
6533 ConnectionClosed(frame::ConnectionClose),
6534 #[error("closed by peer: {0}")]
6536 ApplicationClosed(frame::ApplicationClose),
6537 #[error("reset by peer")]
6539 Reset,
6540 #[error("timed out")]
6546 TimedOut,
6547 #[error("closed")]
6549 LocallyClosed,
6550 #[error("CIDs exhausted")]
6554 CidsExhausted,
6555}
6556
6557impl From<Close> for ConnectionError {
6558 fn from(x: Close) -> Self {
6559 match x {
6560 Close::Connection(reason) => Self::ConnectionClosed(reason),
6561 Close::Application(reason) => Self::ApplicationClosed(reason),
6562 }
6563 }
6564}
6565
6566impl From<ConnectionError> for io::Error {
6568 fn from(x: ConnectionError) -> Self {
6569 use ConnectionError::*;
6570 let kind = match x {
6571 TimedOut => io::ErrorKind::TimedOut,
6572 Reset => io::ErrorKind::ConnectionReset,
6573 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6574 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6575 io::ErrorKind::Other
6576 }
6577 };
6578 Self::new(kind, x)
6579 }
6580}
6581
6582#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6585pub enum PathError {
6586 #[error("multipath extension not negotiated")]
6588 MultipathNotNegotiated,
6589 #[error("the server side may not open a path")]
6591 ServerSideNotAllowed,
6592 #[error("maximum number of concurrent paths reached")]
6594 MaxPathIdReached,
6595 #[error("remoted CIDs exhausted")]
6597 RemoteCidsExhausted,
6598 #[error("path validation failed")]
6600 ValidationFailed,
6601 #[error("invalid remote address")]
6603 InvalidRemoteAddress(SocketAddr),
6604}
6605
6606#[derive(Debug, Error, Clone, Eq, PartialEq)]
6608pub enum ClosePathError {
6609 #[error("closed path")]
6611 ClosedPath,
6612 #[error("last open path")]
6614 LastOpenPath,
6615}
6616
6617#[derive(Debug, Error, Clone, Copy)]
6618#[error("Multipath extension not negotiated")]
6619pub struct MultipathNotNegotiated {
6620 _private: (),
6621}
6622
6623#[derive(Debug)]
6625pub enum Event {
6626 HandshakeDataReady,
6628 Connected,
6630 HandshakeConfirmed,
6632 ConnectionLost {
6636 reason: ConnectionError,
6638 },
6639 Stream(StreamEvent),
6641 DatagramReceived,
6643 DatagramsUnblocked,
6645 Path(PathEvent),
6647 NatTraversal(iroh_hp::Event),
6649}
6650
6651impl From<PathEvent> for Event {
6652 fn from(source: PathEvent) -> Self {
6653 Self::Path(source)
6654 }
6655}
6656
6657fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6658 Duration::from_micros(params.max_ack_delay.0 * 1000)
6659}
6660
6661const MAX_BACKOFF_EXPONENT: u32 = 16;
6663
6664const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6672
6673const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6679 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6680
6681const KEY_UPDATE_MARGIN: u64 = 10_000;
6685
6686#[derive(Default)]
6687struct SentFrames {
6688 retransmits: ThinRetransmits,
6689 largest_acked: FxHashMap<PathId, u64>,
6691 stream_frames: StreamMetaVec,
6692 non_retransmits: bool,
6694 requires_padding: bool,
6696}
6697
6698impl SentFrames {
6699 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6701 !self.largest_acked.is_empty()
6702 && !self.non_retransmits
6703 && self.stream_frames.is_empty()
6704 && self.retransmits.is_empty(streams)
6705 }
6706}
6707
6708fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6716 match (x, y) {
6717 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6718 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6719 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6720 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6721 }
6722}
6723
6724#[cfg(test)]
6725mod tests {
6726 use super::*;
6727
6728 #[test]
6729 fn negotiate_max_idle_timeout_commutative() {
6730 let test_params = [
6731 (None, None, None),
6732 (None, Some(VarInt(0)), None),
6733 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6734 (Some(VarInt(0)), Some(VarInt(0)), None),
6735 (
6736 Some(VarInt(2)),
6737 Some(VarInt(0)),
6738 Some(Duration::from_millis(2)),
6739 ),
6740 (
6741 Some(VarInt(1)),
6742 Some(VarInt(4)),
6743 Some(Duration::from_millis(1)),
6744 ),
6745 ];
6746
6747 for (left, right, result) in test_params {
6748 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6749 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6750 }
6751 }
6752}