1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 num::{NonZeroU32, NonZeroUsize},
8 ops::Not,
9 sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
22 MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
23 TransportErrorCode, VarInt,
24 cid_generator::ConnectionIdGenerator,
25 cid_queue::CidQueue,
26 coding::{BufMutExt, Encodable},
27 config::{ServerConfig, TransportConfig},
28 congestion::Controller,
29 connection::{
30 qlog::{QlogRecvPacket, QlogSentPacket, QlogSink},
31 spaces::LostPacket,
32 timer::{ConnTimer, PathTimer},
33 },
34 crypto::{self, KeyPair, Keys, PacketKey},
35 frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
36 iroh_hp,
37 packet::{
38 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
39 PacketNumber, PartialDecode, SpaceId,
40 },
41 range_set::ArrayRangeSet,
42 shared::{
43 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
44 EndpointEvent, EndpointEventInner,
45 },
46 token::{ResetToken, Token, TokenPayload},
47 transport_parameters::TransportParameters,
48};
49
50mod ack_frequency;
51use ack_frequency::AckFrequencyState;
52
53mod assembler;
54pub use assembler::Chunk;
55
56mod cid_state;
57use cid_state::CidState;
58
59mod datagrams;
60use datagrams::DatagramState;
61pub use datagrams::{Datagrams, SendDatagramError};
62
63mod mtud;
64mod pacing;
65
66mod packet_builder;
67use packet_builder::{PacketBuilder, PadDatagram};
68
69mod packet_crypto;
70use packet_crypto::{PrevCrypto, ZeroRttCrypto};
71
72mod paths;
73pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
74use paths::{PathData, PathState};
75
76pub(crate) mod qlog;
77pub(crate) mod send_buffer;
78
79mod spaces;
80#[cfg(fuzzing)]
81pub use spaces::Retransmits;
82#[cfg(not(fuzzing))]
83use spaces::Retransmits;
84use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
85
86mod stats;
87pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
88
89mod streams;
90#[cfg(fuzzing)]
91pub use streams::StreamsState;
92#[cfg(not(fuzzing))]
93use streams::StreamsState;
94pub use streams::{
95 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
96 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
97};
98
99mod timer;
100use timer::{Timer, TimerTable};
101
102mod transmit_buf;
103use transmit_buf::TransmitBuf;
104
105mod state;
106
107#[cfg(not(fuzzing))]
108use state::State;
109#[cfg(fuzzing)]
110pub use state::State;
111use state::StateType;
112
113pub struct Connection {
153 endpoint_config: Arc<EndpointConfig>,
154 config: Arc<TransportConfig>,
155 rng: StdRng,
156 crypto: Box<dyn crypto::Session>,
157 handshake_cid: ConnectionId,
159 rem_handshake_cid: ConnectionId,
161 local_ip: Option<IpAddr>,
164 paths: BTreeMap<PathId, PathState>,
170 path_counter: u64,
174 allow_mtud: bool,
176 state: State,
177 side: ConnectionSide,
178 zero_rtt_enabled: bool,
180 zero_rtt_crypto: Option<ZeroRttCrypto>,
182 key_phase: bool,
183 key_phase_size: u64,
185 peer_params: TransportParameters,
187 orig_rem_cid: ConnectionId,
189 initial_dst_cid: ConnectionId,
191 retry_src_cid: Option<ConnectionId>,
194 events: VecDeque<Event>,
196 endpoint_events: VecDeque<EndpointEventInner>,
197 spin_enabled: bool,
199 spin: bool,
201 spaces: [PacketSpace; 3],
203 highest_space: SpaceId,
205 prev_crypto: Option<PrevCrypto>,
207 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
212 accepted_0rtt: bool,
213 permit_idle_reset: bool,
215 idle_timeout: Option<Duration>,
217 timers: TimerTable,
218 authentication_failures: u64,
220
221 close: bool,
226
227 ack_frequency: AckFrequencyState,
231
232 receiving_ecn: bool,
237 total_authed_packets: u64,
239 app_limited: bool,
242
243 next_observed_addr_seq_no: VarInt,
248
249 streams: StreamsState,
250 rem_cids: FxHashMap<PathId, CidQueue>,
256 local_cid_state: FxHashMap<PathId, CidState>,
263 datagrams: DatagramState,
265 stats: ConnectionStats,
267 path_stats: FxHashMap<PathId, PathStats>,
269 version: u32,
271
272 max_concurrent_paths: NonZeroU32,
281 local_max_path_id: PathId,
296 remote_max_path_id: PathId,
302 max_path_id_with_cids: PathId,
308 abandoned_paths: FxHashSet<PathId>,
316
317 iroh_hp: iroh_hp::State,
318 qlog: QlogSink,
319}
320
321impl Connection {
322 pub(crate) fn new(
323 endpoint_config: Arc<EndpointConfig>,
324 config: Arc<TransportConfig>,
325 init_cid: ConnectionId,
326 loc_cid: ConnectionId,
327 rem_cid: ConnectionId,
328 remote: SocketAddr,
329 local_ip: Option<IpAddr>,
330 crypto: Box<dyn crypto::Session>,
331 cid_gen: &dyn ConnectionIdGenerator,
332 now: Instant,
333 version: u32,
334 allow_mtud: bool,
335 rng_seed: [u8; 32],
336 side_args: SideArgs,
337 qlog: QlogSink,
338 ) -> Self {
339 let pref_addr_cid = side_args.pref_addr_cid();
340 let path_validated = side_args.path_validated();
341 let connection_side = ConnectionSide::from(side_args);
342 let side = connection_side.side();
343 let mut rng = StdRng::from_seed(rng_seed);
344 let initial_space = {
345 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
346 space.crypto = Some(crypto.initial_keys(init_cid, side));
347 space
348 };
349 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
350 #[cfg(test)]
351 let data_space = match config.deterministic_packet_numbers {
352 true => PacketSpace::new_deterministic(now, SpaceId::Data),
353 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
354 };
355 #[cfg(not(test))]
356 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
357 let state = State::handshake(state::Handshake {
358 rem_cid_set: side.is_server(),
359 expected_token: Bytes::new(),
360 client_hello: None,
361 allow_server_migration: side.is_client(),
362 });
363 let local_cid_state = FxHashMap::from_iter([(
364 PathId::ZERO,
365 CidState::new(
366 cid_gen.cid_len(),
367 cid_gen.cid_lifetime(),
368 now,
369 if pref_addr_cid.is_some() { 2 } else { 1 },
370 ),
371 )]);
372
373 let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config);
374 path.open = true;
376 let mut this = Self {
377 endpoint_config,
378 crypto,
379 handshake_cid: loc_cid,
380 rem_handshake_cid: rem_cid,
381 local_cid_state,
382 paths: BTreeMap::from_iter([(
383 PathId::ZERO,
384 PathState {
385 data: path,
386 prev: None,
387 },
388 )]),
389 path_counter: 0,
390 allow_mtud,
391 local_ip,
392 state,
393 side: connection_side,
394 zero_rtt_enabled: false,
395 zero_rtt_crypto: None,
396 key_phase: false,
397 key_phase_size: rng.random_range(10..1000),
404 peer_params: TransportParameters::default(),
405 orig_rem_cid: rem_cid,
406 initial_dst_cid: init_cid,
407 retry_src_cid: None,
408 events: VecDeque::new(),
409 endpoint_events: VecDeque::new(),
410 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
411 spin: false,
412 spaces: [initial_space, handshake_space, data_space],
413 highest_space: SpaceId::Initial,
414 prev_crypto: None,
415 next_crypto: None,
416 accepted_0rtt: false,
417 permit_idle_reset: true,
418 idle_timeout: match config.max_idle_timeout {
419 None | Some(VarInt(0)) => None,
420 Some(dur) => Some(Duration::from_millis(dur.0)),
421 },
422 timers: TimerTable::default(),
423 authentication_failures: 0,
424 close: false,
425
426 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
427 &TransportParameters::default(),
428 )),
429
430 app_limited: false,
431 receiving_ecn: false,
432 total_authed_packets: 0,
433
434 next_observed_addr_seq_no: 0u32.into(),
435
436 streams: StreamsState::new(
437 side,
438 config.max_concurrent_uni_streams,
439 config.max_concurrent_bidi_streams,
440 config.send_window,
441 config.receive_window,
442 config.stream_receive_window,
443 ),
444 datagrams: DatagramState::default(),
445 config,
446 rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
447 rng,
448 stats: ConnectionStats::default(),
449 path_stats: Default::default(),
450 version,
451
452 max_concurrent_paths: NonZeroU32::MIN,
454 local_max_path_id: PathId::ZERO,
455 remote_max_path_id: PathId::ZERO,
456 max_path_id_with_cids: PathId::ZERO,
457 abandoned_paths: Default::default(),
458
459 iroh_hp: Default::default(),
461 qlog,
462 };
463 if path_validated {
464 this.on_path_validated(PathId::ZERO);
465 }
466 if side.is_client() {
467 this.write_crypto();
469 this.init_0rtt(now);
470 }
471 this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now);
472 this
473 }
474
475 #[must_use]
483 pub fn poll_timeout(&mut self) -> Option<Instant> {
484 self.timers.peek()
485 }
486
487 #[must_use]
493 pub fn poll(&mut self) -> Option<Event> {
494 if let Some(x) = self.events.pop_front() {
495 return Some(x);
496 }
497
498 if let Some(event) = self.streams.poll() {
499 return Some(Event::Stream(event));
500 }
501
502 if let Some(reason) = self.state.take_error() {
503 return Some(Event::ConnectionLost { reason });
504 }
505
506 None
507 }
508
509 #[must_use]
511 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
512 self.endpoint_events.pop_front().map(EndpointEvent)
513 }
514
515 #[must_use]
517 pub fn streams(&mut self) -> Streams<'_> {
518 Streams {
519 state: &mut self.streams,
520 conn_state: &self.state,
521 }
522 }
523
524 #[must_use]
526 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
527 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
528 RecvStream {
529 id,
530 state: &mut self.streams,
531 pending: &mut self.spaces[SpaceId::Data].pending,
532 }
533 }
534
535 #[must_use]
537 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
538 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
539 SendStream {
540 id,
541 state: &mut self.streams,
542 pending: &mut self.spaces[SpaceId::Data].pending,
543 conn_state: &self.state,
544 }
545 }
546
547 pub fn open_path_ensure(
554 &mut self,
555 remote: SocketAddr,
556 initial_status: PathStatus,
557 now: Instant,
558 ) -> Result<(PathId, bool), PathError> {
559 match self
560 .paths
561 .iter()
562 .find(|(_id, path)| path.data.remote == remote)
563 {
564 Some((path_id, _state)) => Ok((*path_id, true)),
565 None => self
566 .open_path(remote, initial_status, now)
567 .map(|id| (id, false)),
568 }
569 }
570
571 pub fn open_path(
576 &mut self,
577 remote: SocketAddr,
578 initial_status: PathStatus,
579 now: Instant,
580 ) -> Result<PathId, PathError> {
581 if !self.is_multipath_negotiated() {
582 return Err(PathError::MultipathNotNegotiated);
583 }
584 if self.side().is_server() {
585 return Err(PathError::ServerSideNotAllowed);
586 }
587
588 let max_abandoned = self.abandoned_paths.iter().max().copied();
589 let max_used = self.paths.keys().last().copied();
590 let path_id = max_abandoned
591 .max(max_used)
592 .unwrap_or(PathId::ZERO)
593 .saturating_add(1u8);
594
595 if Some(path_id) > self.max_path_id() {
596 return Err(PathError::MaxPathIdReached);
597 }
598 if path_id > self.remote_max_path_id {
599 self.spaces[SpaceId::Data].pending.paths_blocked = true;
600 return Err(PathError::MaxPathIdReached);
601 }
602 if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
603 self.spaces[SpaceId::Data]
604 .pending
605 .path_cids_blocked
606 .push(path_id);
607 return Err(PathError::RemoteCidsExhausted);
608 }
609
610 let path = self.ensure_path(path_id, remote, now, None);
611 path.status.local_update(initial_status);
612
613 Ok(path_id)
614 }
615
616 pub fn close_path(
622 &mut self,
623 now: Instant,
624 path_id: PathId,
625 error_code: VarInt,
626 ) -> Result<(), ClosePathError> {
627 if self.abandoned_paths.contains(&path_id)
628 || Some(path_id) > self.max_path_id()
629 || !self.paths.contains_key(&path_id)
630 {
631 return Err(ClosePathError::ClosedPath);
632 }
633 if self
634 .paths
635 .iter()
636 .any(|(id, path)| {
638 *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
639 })
640 .not()
641 {
642 return Err(ClosePathError::LastOpenPath);
643 }
644
645 self.spaces[SpaceId::Data]
647 .pending
648 .path_abandon
649 .insert(path_id, error_code.into());
650
651 let pending_space = &mut self.spaces[SpaceId::Data].pending;
653 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
654 pending_space.path_cids_blocked.retain(|&id| id != path_id);
655 pending_space.path_status.retain(|&id| id != path_id);
656
657 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
659 for sent_packet in space.sent_packets.values_mut() {
660 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
661 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
662 retransmits.path_cids_blocked.retain(|&id| id != path_id);
663 retransmits.path_status.retain(|&id| id != path_id);
664 }
665 }
666 }
667
668 self.rem_cids.remove(&path_id);
674 self.endpoint_events
675 .push_back(EndpointEventInner::RetireResetToken(path_id));
676
677 let pto = self.pto_max_path(SpaceId::Data, false);
678
679 let path = self.paths.get_mut(&path_id).expect("checked above");
680
681 path.data.last_allowed_receive = Some(now + 3 * pto);
683 self.abandoned_paths.insert(path_id);
684
685 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
686
687 self.timers.set(
692 Timer::PerPath(path_id, PathTimer::DiscardPath),
693 now + 6 * pto,
694 self.qlog.with_time(now),
695 );
696 Ok(())
697 }
698
699 #[track_caller]
703 fn path_data(&self, path_id: PathId) -> &PathData {
704 if let Some(data) = self.paths.get(&path_id) {
705 &data.data
706 } else {
707 panic!(
708 "unknown path: {path_id}, currently known paths: {:?}",
709 self.paths.keys().collect::<Vec<_>>()
710 );
711 }
712 }
713
714 fn path(&self, path_id: PathId) -> Option<&PathData> {
716 self.paths.get(&path_id).map(|path_state| &path_state.data)
717 }
718
719 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
721 self.paths
722 .get_mut(&path_id)
723 .map(|path_state| &mut path_state.data)
724 }
725
726 pub fn paths(&self) -> Vec<PathId> {
730 self.paths.keys().copied().collect()
731 }
732
733 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
735 self.path(path_id)
736 .map(PathData::local_status)
737 .ok_or(ClosedPath { _private: () })
738 }
739
740 pub fn path_remote_address(&self, path_id: PathId) -> Result<SocketAddr, ClosedPath> {
742 self.path(path_id)
743 .map(|path| path.remote)
744 .ok_or(ClosedPath { _private: () })
745 }
746
747 pub fn set_path_status(
751 &mut self,
752 path_id: PathId,
753 status: PathStatus,
754 ) -> Result<PathStatus, SetPathStatusError> {
755 if !self.is_multipath_negotiated() {
756 return Err(SetPathStatusError::MultipathNotNegotiated);
757 }
758 let path = self
759 .path_mut(path_id)
760 .ok_or(SetPathStatusError::ClosedPath)?;
761 let prev = match path.status.local_update(status) {
762 Some(prev) => {
763 self.spaces[SpaceId::Data]
764 .pending
765 .path_status
766 .insert(path_id);
767 prev
768 }
769 None => path.local_status(),
770 };
771 Ok(prev)
772 }
773
774 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
779 self.path(path_id).and_then(|path| path.remote_status())
780 }
781
782 pub fn set_path_max_idle_timeout(
788 &mut self,
789 path_id: PathId,
790 timeout: Option<Duration>,
791 ) -> Result<Option<Duration>, ClosedPath> {
792 let path = self
793 .paths
794 .get_mut(&path_id)
795 .ok_or(ClosedPath { _private: () })?;
796 Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
797 }
798
799 pub fn set_path_keep_alive_interval(
805 &mut self,
806 path_id: PathId,
807 interval: Option<Duration>,
808 ) -> Result<Option<Duration>, ClosedPath> {
809 let path = self
810 .paths
811 .get_mut(&path_id)
812 .ok_or(ClosedPath { _private: () })?;
813 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
814 }
815
816 #[track_caller]
820 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
821 &mut self.paths.get_mut(&path_id).expect("known path").data
822 }
823
824 fn ensure_path(
825 &mut self,
826 path_id: PathId,
827 remote: SocketAddr,
828 now: Instant,
829 pn: Option<u64>,
830 ) -> &mut PathData {
831 let vacant_entry = match self.paths.entry(path_id) {
832 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
833 btree_map::Entry::Occupied(occupied_entry) => {
834 return &mut occupied_entry.into_mut().data;
835 }
836 };
837
838 debug!(%path_id, ?remote, "path added");
841 let peer_max_udp_payload_size =
842 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
843 self.path_counter = self.path_counter.wrapping_add(1);
844 let mut data = PathData::new(
845 remote,
846 self.allow_mtud,
847 Some(peer_max_udp_payload_size),
848 self.path_counter,
849 now,
850 &self.config,
851 );
852
853 let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
854 self.timers.set(
855 Timer::PerPath(path_id, PathTimer::PathOpen),
856 now + 3 * pto,
857 self.qlog.with_time(now),
858 );
859
860 data.send_new_challenge = true;
863
864 let path = vacant_entry.insert(PathState { data, prev: None });
865
866 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
867 if let Some(pn) = pn {
868 pn_space.dedup.insert(pn);
869 }
870 self.spaces[SpaceId::Data]
871 .number_spaces
872 .insert(path_id, pn_space);
873 self.qlog.emit_tuple_assigned(path_id, remote, now);
874 &mut path.data
875 }
876
877 #[must_use]
887 pub fn poll_transmit(
888 &mut self,
889 now: Instant,
890 max_datagrams: NonZeroUsize,
891 buf: &mut Vec<u8>,
892 ) -> Option<Transmit> {
893 if let Some(probing) = self
894 .iroh_hp
895 .server_side_mut()
896 .ok()
897 .and_then(iroh_hp::ServerState::next_probe)
898 {
899 let destination = probing.remote();
900 trace!(%destination, "RAND_DATA packet");
901 let token: u64 = self.rng.random();
902 buf.put_u64(token);
903 probing.finish(token);
904 return Some(Transmit {
905 destination,
906 ecn: None,
907 size: 8,
908 segment_size: None,
909 src_ip: None,
910 });
911 }
912
913 let max_datagrams = match self.config.enable_segmentation_offload {
914 false => NonZeroUsize::MIN,
915 true => max_datagrams,
916 };
917
918 let close = match self.state.as_type() {
937 StateType::Drained => {
938 self.app_limited = true;
939 return None;
940 }
941 StateType::Draining | StateType::Closed => {
942 if !self.close {
945 self.app_limited = true;
946 return None;
947 }
948 true
949 }
950 _ => false,
951 };
952
953 if let Some(config) = &self.config.ack_frequency_config {
955 let rtt = self
956 .paths
957 .values()
958 .map(|p| p.data.rtt.get())
959 .min()
960 .expect("one path exists");
961 self.spaces[SpaceId::Data].pending.ack_frequency = self
962 .ack_frequency
963 .should_send_ack_frequency(rtt, config, &self.peer_params)
964 && self.highest_space == SpaceId::Data
965 && self.peer_supports_ack_frequency();
966 }
967
968 let mut coalesce = true;
970
971 let mut pad_datagram = PadDatagram::No;
974
975 let mut congestion_blocked = false;
979
980 let mut last_packet_number = None;
982
983 let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
984
985 let have_available_path = self.paths.iter().any(|(id, path)| {
988 path.data.validated
989 && path.data.local_status() == PathStatus::Available
990 && self.rem_cids.contains_key(id)
991 });
992
993 let mut transmit = TransmitBuf::new(
995 buf,
996 max_datagrams,
997 self.path_data(path_id).current_mtu().into(),
998 );
999 if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
1000 return Some(challenge);
1001 }
1002 let mut space_id = match path_id {
1003 PathId::ZERO => SpaceId::Initial,
1004 _ => SpaceId::Data,
1005 };
1006
1007 loop {
1008 let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1010 let err = PathError::RemoteCidsExhausted;
1011 if !self.abandoned_paths.contains(&path_id) {
1012 debug!(?err, %path_id, "no active CID for path");
1013 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1014 id: path_id,
1015 error: err,
1016 }));
1017 self.close_path(
1021 now,
1022 path_id,
1023 TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1024 )
1025 .ok();
1026 self.spaces[SpaceId::Data]
1027 .pending
1028 .path_cids_blocked
1029 .push(path_id);
1030 } else {
1031 trace!(%path_id, "remote CIDs retired for abandoned path");
1032 }
1033
1034 match self.paths.keys().find(|&&next| next > path_id) {
1035 Some(next_path_id) => {
1036 path_id = *next_path_id;
1038 space_id = SpaceId::Data;
1039
1040 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1042 if let Some(challenge) =
1043 self.send_prev_path_challenge(now, &mut transmit, path_id)
1044 {
1045 return Some(challenge);
1046 }
1047
1048 continue;
1049 }
1050 None => {
1051 trace!(
1053 ?space_id,
1054 %path_id,
1055 "no CIDs to send on path, no more paths"
1056 );
1057 break;
1058 }
1059 }
1060 };
1061
1062 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1065 transmit.datagram_remaining_mut()
1067 } else {
1068 transmit.segment_size()
1070 };
1071 let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1072 let path_should_send = {
1073 let path_exclusive_only = space_id == SpaceId::Data
1074 && have_available_path
1075 && self.path_data(path_id).local_status() == PathStatus::Backup;
1076 let path_should_send = if path_exclusive_only {
1077 can_send.path_exclusive
1078 } else {
1079 !can_send.is_empty()
1080 };
1081 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1082 path_should_send || needs_loss_probe
1083 };
1084
1085 if !path_should_send && space_id < SpaceId::Data {
1086 if self.spaces[space_id].crypto.is_some() {
1087 trace!(?space_id, %path_id, "nothing to send in space");
1088 }
1089 space_id = space_id.next();
1090 continue;
1091 }
1092
1093 let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1094 self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1096 } else {
1097 PathBlocked::No
1098 };
1099 if send_blocked != PathBlocked::No {
1100 trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1101 congestion_blocked = true;
1102 }
1103 if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1104 space_id = space_id.next();
1107 continue;
1108 }
1109 if !path_should_send || send_blocked != PathBlocked::No {
1110 if transmit.num_datagrams() > 0 {
1115 break;
1116 }
1117
1118 match self.paths.keys().find(|&&next| next > path_id) {
1119 Some(next_path_id) => {
1120 trace!(
1122 ?space_id,
1123 %path_id,
1124 %next_path_id,
1125 "nothing to send on path"
1126 );
1127 path_id = *next_path_id;
1128 space_id = SpaceId::Data;
1129
1130 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1132 if let Some(challenge) =
1133 self.send_prev_path_challenge(now, &mut transmit, path_id)
1134 {
1135 return Some(challenge);
1136 }
1137
1138 continue;
1139 }
1140 None => {
1141 trace!(
1143 ?space_id,
1144 %path_id,
1145 next_path_id=?None::<PathId>,
1146 "nothing to send on path"
1147 );
1148 break;
1149 }
1150 }
1151 }
1152
1153 if transmit.datagram_remaining_mut() == 0 {
1155 if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1156 break;
1158 }
1159
1160 match self.spaces[space_id].for_path(path_id).loss_probes {
1161 0 => transmit.start_new_datagram(),
1162 _ => {
1163 let request_immediate_ack =
1165 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1166 self.spaces[space_id].maybe_queue_probe(
1167 path_id,
1168 request_immediate_ack,
1169 &self.streams,
1170 );
1171
1172 self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1173
1174 transmit.start_new_datagram_with_size(std::cmp::min(
1178 usize::from(INITIAL_MTU),
1179 transmit.segment_size(),
1180 ));
1181 }
1182 }
1183 trace!(count = transmit.num_datagrams(), "new datagram started");
1184 coalesce = true;
1185 pad_datagram = PadDatagram::No;
1186 }
1187
1188 if transmit.datagram_start_offset() < transmit.len() {
1191 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1192 }
1193
1194 if self.spaces[SpaceId::Initial].crypto.is_some()
1199 && space_id == SpaceId::Handshake
1200 && self.side.is_client()
1201 {
1202 self.discard_space(now, SpaceId::Initial);
1205 }
1206 if let Some(ref mut prev) = self.prev_crypto {
1207 prev.update_unacked = false;
1208 }
1209
1210 let mut qlog = QlogSentPacket::default();
1211 let mut builder = PacketBuilder::new(
1212 now,
1213 space_id,
1214 path_id,
1215 remote_cid,
1216 &mut transmit,
1217 can_send.other,
1218 self,
1219 &mut qlog,
1220 )?;
1221 last_packet_number = Some(builder.exact_number);
1222 coalesce = coalesce && !builder.short_header;
1223
1224 if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1225 pad_datagram |= PadDatagram::ToMinMtu;
1227 }
1228 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1229 pad_datagram |= PadDatagram::ToSegmentSize;
1230 }
1231
1232 if can_send.close {
1233 trace!("sending CONNECTION_CLOSE");
1234 let mut sent_frames = SentFrames::default();
1239 let is_multipath_negotiated = self.is_multipath_negotiated();
1240 for path_id in self.spaces[space_id]
1241 .number_spaces
1242 .iter()
1243 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1244 .map(|(&path_id, _)| path_id)
1245 .collect::<Vec<_>>()
1246 {
1247 Self::populate_acks(
1248 now,
1249 self.receiving_ecn,
1250 &mut sent_frames,
1251 path_id,
1252 space_id,
1253 &mut self.spaces[space_id],
1254 is_multipath_negotiated,
1255 &mut builder.frame_space_mut(),
1256 &mut self.stats,
1257 &mut qlog,
1258 );
1259 }
1260
1261 debug_assert!(
1265 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1266 "ACKs should leave space for ConnectionClose"
1267 );
1268 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1269 let max_frame_size = builder.frame_space_remaining();
1270 match self.state.as_type() {
1271 StateType::Closed => {
1272 let reason: Close =
1273 self.state.as_closed().expect("checked").clone().into();
1274 if space_id == SpaceId::Data || reason.is_transport_layer() {
1275 reason
1276 .encoder(max_frame_size)
1277 .encode(&mut builder.frame_space_mut());
1278 qlog.frame(&Frame::Close(reason));
1279 } else {
1280 let frame = frame::ConnectionClose {
1281 error_code: TransportErrorCode::APPLICATION_ERROR,
1282 frame_type: frame::MaybeFrame::None,
1283 reason: Bytes::new(),
1284 };
1285 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1286 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1287 }
1288 }
1289 StateType::Draining => {
1290 let frame = frame::ConnectionClose {
1291 error_code: TransportErrorCode::NO_ERROR,
1292 frame_type: frame::MaybeFrame::None,
1293 reason: Bytes::new(),
1294 };
1295 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1296 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1297 }
1298 _ => unreachable!(
1299 "tried to make a close packet when the connection wasn't closed"
1300 ),
1301 };
1302 }
1303 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1304 if space_id == self.highest_space {
1305 self.close = false;
1308 break;
1310 } else {
1311 space_id = space_id.next();
1315 continue;
1316 }
1317 }
1318
1319 if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1322 let path = self.path_data_mut(path_id);
1323 if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) {
1324 let response = frame::PathResponse(token);
1328 trace!(%response, "(off-path)");
1329 builder.frame_space_mut().write(response);
1330 qlog.frame(&Frame::PathResponse(response));
1331 self.stats.frame_tx.path_response += 1;
1332 builder.finish_and_track(
1333 now,
1334 self,
1335 path_id,
1336 SentFrames {
1337 non_retransmits: true,
1338 ..SentFrames::default()
1339 },
1340 PadDatagram::ToMinMtu,
1341 qlog,
1342 );
1343 self.stats.udp_tx.on_sent(1, transmit.len());
1344 return Some(Transmit {
1345 destination: remote,
1346 size: transmit.len(),
1347 ecn: None,
1348 segment_size: None,
1349 src_ip: self.local_ip,
1350 });
1351 }
1352 }
1353
1354 let sent_frames = {
1355 let path_exclusive_only = have_available_path
1356 && self.path_data(path_id).local_status() == PathStatus::Backup;
1357 let pn = builder.exact_number;
1358 self.populate_packet(
1359 now,
1360 space_id,
1361 path_id,
1362 path_exclusive_only,
1363 &mut builder.frame_space_mut(),
1364 pn,
1365 &mut qlog,
1366 )
1367 };
1368
1369 debug_assert!(
1376 !(sent_frames.is_ack_only(&self.streams)
1377 && !can_send.acks
1378 && can_send.other
1379 && builder.buf.segment_size()
1380 == self.path_data(path_id).current_mtu() as usize
1381 && self.datagrams.outgoing.is_empty()),
1382 "SendableFrames was {can_send:?}, but only ACKs have been written"
1383 );
1384 if sent_frames.requires_padding {
1385 pad_datagram |= PadDatagram::ToMinMtu;
1386 }
1387
1388 for (path_id, _pn) in sent_frames.largest_acked.iter() {
1389 self.spaces[space_id]
1390 .for_path(*path_id)
1391 .pending_acks
1392 .acks_sent();
1393 self.timers.stop(
1394 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1395 self.qlog.with_time(now),
1396 );
1397 }
1398
1399 if coalesce
1407 && builder
1408 .buf
1409 .datagram_remaining_mut()
1410 .saturating_sub(builder.predict_packet_end())
1411 > MIN_PACKET_SPACE
1412 && self
1413 .next_send_space(space_id, path_id, builder.buf, close)
1414 .is_some()
1415 {
1416 builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog);
1419 } else {
1420 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1423 const MAX_PADDING: usize = 32;
1431 if builder.buf.datagram_remaining_mut()
1432 > builder.predict_packet_end() + MAX_PADDING
1433 {
1434 trace!(
1435 "GSO truncated by demand for {} padding bytes",
1436 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1437 );
1438 builder.finish_and_track(
1439 now,
1440 self,
1441 path_id,
1442 sent_frames,
1443 PadDatagram::No,
1444 qlog,
1445 );
1446 break;
1447 }
1448
1449 builder.finish_and_track(
1452 now,
1453 self,
1454 path_id,
1455 sent_frames,
1456 PadDatagram::ToSegmentSize,
1457 qlog,
1458 );
1459 } else {
1460 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1461 }
1462 if transmit.num_datagrams() == 1 {
1463 transmit.clip_datagram_size();
1464 }
1465 }
1466 }
1467
1468 if let Some(last_packet_number) = last_packet_number {
1469 self.path_data_mut(path_id).congestion.on_sent(
1472 now,
1473 transmit.len() as u64,
1474 last_packet_number,
1475 );
1476 }
1477
1478 self.qlog.emit_recovery_metrics(
1479 path_id,
1480 &mut self.paths.get_mut(&path_id).unwrap().data,
1481 now,
1482 );
1483
1484 self.app_limited = transmit.is_empty() && !congestion_blocked;
1485
1486 if transmit.is_empty() && self.state.is_established() {
1488 let space_id = SpaceId::Data;
1490 path_id = *self.paths.first_key_value().expect("one path must exist").0;
1491 let probe_data = loop {
1492 let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1498 let eligible = self.path_data(path_id).validated
1499 && !self.path_data(path_id).is_validating_path()
1500 && !self.abandoned_paths.contains(&path_id);
1501 let probe_size = eligible
1502 .then(|| {
1503 let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1504 self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1505 })
1506 .flatten();
1507 match (active_cid, probe_size) {
1508 (Some(active_cid), Some(probe_size)) => {
1509 break Some((active_cid, probe_size));
1511 }
1512 _ => {
1513 match self.paths.keys().find(|&&next| next > path_id) {
1515 Some(next) => {
1516 path_id = *next;
1517 continue;
1518 }
1519 None => break None,
1520 }
1521 }
1522 }
1523 };
1524 if let Some((active_cid, probe_size)) = probe_data {
1525 debug_assert_eq!(transmit.num_datagrams(), 0);
1527 transmit.start_new_datagram_with_size(probe_size as usize);
1528
1529 let mut qlog = QlogSentPacket::default();
1530 let mut builder = PacketBuilder::new(
1531 now,
1532 space_id,
1533 path_id,
1534 active_cid,
1535 &mut transmit,
1536 true,
1537 self,
1538 &mut qlog,
1539 )?;
1540
1541 trace!(?probe_size, "writing MTUD probe");
1543 trace!("PING");
1544 builder.frame_space_mut().write(frame::FrameType::Ping);
1545 qlog.frame(&Frame::Ping);
1546 self.stats.frame_tx.ping += 1;
1547
1548 if self.peer_supports_ack_frequency() {
1550 trace!("IMMEDIATE_ACK");
1551 builder
1552 .frame_space_mut()
1553 .write(frame::FrameType::ImmediateAck);
1554 self.stats.frame_tx.immediate_ack += 1;
1555 qlog.frame(&Frame::ImmediateAck);
1556 }
1557
1558 let sent_frames = SentFrames {
1559 non_retransmits: true,
1560 ..Default::default()
1561 };
1562 builder.finish_and_track(
1563 now,
1564 self,
1565 path_id,
1566 sent_frames,
1567 PadDatagram::ToSize(probe_size),
1568 qlog,
1569 );
1570
1571 self.path_stats
1572 .entry(path_id)
1573 .or_default()
1574 .sent_plpmtud_probes += 1;
1575 }
1576 }
1577
1578 if transmit.is_empty() {
1579 return None;
1580 }
1581
1582 let destination = self.path_data(path_id).remote;
1583 trace!(
1584 segment_size = transmit.segment_size(),
1585 last_datagram_len = transmit.len() % transmit.segment_size(),
1586 ?destination,
1587 "sending {} bytes in {} datagrams",
1588 transmit.len(),
1589 transmit.num_datagrams()
1590 );
1591 self.path_data_mut(path_id)
1592 .inc_total_sent(transmit.len() as u64);
1593
1594 self.stats
1595 .udp_tx
1596 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1597
1598 Some(Transmit {
1599 destination,
1600 size: transmit.len(),
1601 ecn: if self.path_data(path_id).sending_ecn {
1602 Some(EcnCodepoint::Ect0)
1603 } else {
1604 None
1605 },
1606 segment_size: match transmit.num_datagrams() {
1607 1 => None,
1608 _ => Some(transmit.segment_size()),
1609 },
1610 src_ip: self.local_ip,
1611 })
1612 }
1613
1614 fn next_send_space(
1619 &mut self,
1620 current_space_id: SpaceId,
1621 path_id: PathId,
1622 buf: &TransmitBuf<'_>,
1623 close: bool,
1624 ) -> Option<SpaceId> {
1625 let mut space_id = current_space_id;
1632 loop {
1633 let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1634 if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1635 return Some(space_id);
1636 }
1637 space_id = match space_id {
1638 SpaceId::Initial => SpaceId::Handshake,
1639 SpaceId::Handshake => SpaceId::Data,
1640 SpaceId::Data => break,
1641 }
1642 }
1643 None
1644 }
1645
1646 fn path_congestion_check(
1648 &mut self,
1649 space_id: SpaceId,
1650 path_id: PathId,
1651 transmit: &TransmitBuf<'_>,
1652 can_send: &SendableFrames,
1653 now: Instant,
1654 ) -> PathBlocked {
1655 if self.side().is_server()
1661 && self
1662 .path_data(path_id)
1663 .anti_amplification_blocked(transmit.len() as u64 + 1)
1664 {
1665 trace!(?space_id, %path_id, "blocked by anti-amplification");
1666 return PathBlocked::AntiAmplification;
1667 }
1668
1669 let bytes_to_send = transmit.segment_size() as u64;
1672 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1673
1674 if can_send.other && !need_loss_probe && !can_send.close {
1675 let path = self.path_data(path_id);
1676 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1677 trace!(?space_id, %path_id, "blocked by congestion control");
1678 return PathBlocked::Congestion;
1679 }
1680 }
1681
1682 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1684 self.timers.set(
1685 Timer::PerPath(path_id, PathTimer::Pacing),
1686 delay,
1687 self.qlog.with_time(now),
1688 );
1689 trace!(?space_id, %path_id, "blocked by pacing");
1692 return PathBlocked::Pacing;
1693 }
1694
1695 PathBlocked::No
1696 }
1697
1698 fn send_prev_path_challenge(
1703 &mut self,
1704 now: Instant,
1705 buf: &mut TransmitBuf<'_>,
1706 path_id: PathId,
1707 ) -> Option<Transmit> {
1708 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1709 if !prev_path.send_new_challenge {
1712 return None;
1713 };
1714 prev_path.send_new_challenge = false;
1715 let destination = prev_path.remote;
1716 let token = self.rng.random();
1717 let info = paths::SentChallengeInfo {
1718 sent_instant: now,
1719 remote: destination,
1720 };
1721 prev_path.challenges_sent.insert(token, info);
1722 debug_assert_eq!(
1723 self.highest_space,
1724 SpaceId::Data,
1725 "PATH_CHALLENGE queued without 1-RTT keys"
1726 );
1727 buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1728
1729 debug_assert_eq!(buf.datagram_start_offset(), 0);
1735 let mut qlog = QlogSentPacket::default();
1736 let mut builder = PacketBuilder::new(
1737 now,
1738 SpaceId::Data,
1739 path_id,
1740 *prev_cid,
1741 buf,
1742 false,
1743 self,
1744 &mut qlog,
1745 )?;
1746 let challenge = frame::PathChallenge(token);
1747 trace!(%challenge, "validating previous path");
1748 qlog.frame(&Frame::PathChallenge(challenge));
1749 builder.frame_space_mut().write(challenge);
1750 self.stats.frame_tx.path_challenge += 1;
1751
1752 builder.pad_to(MIN_INITIAL_SIZE);
1757
1758 builder.finish(self, now, qlog);
1759 self.stats.udp_tx.on_sent(1, buf.len());
1760
1761 Some(Transmit {
1762 destination,
1763 size: buf.len(),
1764 ecn: None,
1765 segment_size: None,
1766 src_ip: self.local_ip,
1767 })
1768 }
1769
1770 fn space_can_send(
1775 &mut self,
1776 space_id: SpaceId,
1777 path_id: PathId,
1778 packet_size: usize,
1779 close: bool,
1780 ) -> SendableFrames {
1781 let pn = self.spaces[SpaceId::Data]
1782 .for_path(path_id)
1783 .peek_tx_number();
1784 let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1785 if self.spaces[space_id].crypto.is_none()
1786 && (space_id != SpaceId::Data
1787 || self.zero_rtt_crypto.is_none()
1788 || self.side.is_server())
1789 {
1790 return SendableFrames::empty();
1792 }
1793 let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1794 if space_id == SpaceId::Data {
1795 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1796 }
1797
1798 can_send.close = close && self.spaces[space_id].crypto.is_some();
1799
1800 can_send
1801 }
1802
1803 pub fn handle_event(&mut self, event: ConnectionEvent) {
1809 use ConnectionEventInner::*;
1810 match event.0 {
1811 Datagram(DatagramConnectionEvent {
1812 now,
1813 remote,
1814 path_id,
1815 ecn,
1816 first_decode,
1817 remaining,
1818 }) => {
1819 let span = trace_span!("pkt", %path_id);
1820 let _guard = span.enter();
1821 if let Some(known_remote) = self.path(path_id).map(|path| path.remote) {
1825 if remote != known_remote && !self.side.remote_may_migrate(&self.state) {
1826 trace!(
1827 %path_id,
1828 ?remote,
1829 path_remote = ?self.path(path_id).map(|p| p.remote),
1830 "discarding packet from unrecognized peer"
1831 );
1832 return;
1833 }
1834 }
1835
1836 let was_anti_amplification_blocked = self
1837 .path(path_id)
1838 .map(|path| path.anti_amplification_blocked(1))
1839 .unwrap_or(true); self.stats.udp_rx.datagrams += 1;
1843 self.stats.udp_rx.bytes += first_decode.len() as u64;
1844 let data_len = first_decode.len();
1845
1846 self.handle_decode(now, remote, path_id, ecn, first_decode);
1847 if let Some(path) = self.path_mut(path_id) {
1852 path.inc_total_recvd(data_len as u64);
1853 }
1854
1855 if let Some(data) = remaining {
1856 self.stats.udp_rx.bytes += data.len() as u64;
1857 self.handle_coalesced(now, remote, path_id, ecn, data);
1858 }
1859
1860 if let Some(path) = self.paths.get_mut(&path_id) {
1861 self.qlog
1862 .emit_recovery_metrics(path_id, &mut path.data, now);
1863 }
1864
1865 if was_anti_amplification_blocked {
1866 self.set_loss_detection_timer(now, path_id);
1870 }
1871 }
1872 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1873 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1874 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1875 let cid_state = self
1876 .local_cid_state
1877 .entry(path_id)
1878 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1879 cid_state.new_cids(&ids, now);
1880
1881 ids.into_iter().rev().for_each(|frame| {
1882 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1883 });
1884 self.reset_cid_retirement(now);
1886 }
1887 }
1888 }
1889
1890 pub fn handle_timeout(&mut self, now: Instant) {
1900 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1901 trace!(?timer, at=?now, "timeout");
1903 match timer {
1904 Timer::Conn(timer) => match timer {
1905 ConnTimer::Close => {
1906 self.state.move_to_drained(None);
1907 self.endpoint_events.push_back(EndpointEventInner::Drained);
1908 }
1909 ConnTimer::Idle => {
1910 self.kill(ConnectionError::TimedOut);
1911 }
1912 ConnTimer::KeepAlive => {
1913 trace!("sending keep-alive");
1914 self.ping();
1915 }
1916 ConnTimer::KeyDiscard => {
1917 self.zero_rtt_crypto = None;
1918 self.prev_crypto = None;
1919 }
1920 ConnTimer::PushNewCid => {
1921 while let Some((path_id, when)) = self.next_cid_retirement() {
1922 if when > now {
1923 break;
1924 }
1925 match self.local_cid_state.get_mut(&path_id) {
1926 None => error!(%path_id, "No local CID state for path"),
1927 Some(cid_state) => {
1928 let num_new_cid = cid_state.on_cid_timeout().into();
1930 if !self.state.is_closed() {
1931 trace!(
1932 "push a new CID to peer RETIRE_PRIOR_TO field {}",
1933 cid_state.retire_prior_to()
1934 );
1935 self.endpoint_events.push_back(
1936 EndpointEventInner::NeedIdentifiers(
1937 path_id,
1938 now,
1939 num_new_cid,
1940 ),
1941 );
1942 }
1943 }
1944 }
1945 }
1946 }
1947 },
1948 Timer::PerPath(path_id, timer) => {
1950 let span = trace_span!("per-path timer fired", %path_id, ?timer);
1951 let _guard = span.enter();
1952 match timer {
1953 PathTimer::PathIdle => {
1954 self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1955 .ok();
1956 }
1957
1958 PathTimer::PathKeepAlive => {
1959 trace!("sending keep-alive on path");
1960 self.ping_path(path_id).ok();
1961 }
1962 PathTimer::LossDetection => {
1963 self.on_loss_detection_timeout(now, path_id);
1964 self.qlog.emit_recovery_metrics(
1965 path_id,
1966 &mut self.paths.get_mut(&path_id).unwrap().data,
1967 now,
1968 );
1969 }
1970 PathTimer::PathValidation => {
1971 let Some(path) = self.paths.get_mut(&path_id) else {
1972 continue;
1973 };
1974 self.timers.stop(
1975 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1976 self.qlog.with_time(now),
1977 );
1978 debug!("path validation failed");
1979 if let Some((_, prev)) = path.prev.take() {
1980 path.data = prev;
1981 }
1982 path.data.challenges_sent.clear();
1983 path.data.send_new_challenge = false;
1984 }
1985 PathTimer::PathChallengeLost => {
1986 let Some(path) = self.paths.get_mut(&path_id) else {
1987 continue;
1988 };
1989 trace!("path challenge deemed lost");
1990 path.data.send_new_challenge = true;
1991 }
1992 PathTimer::PathOpen => {
1993 let Some(path) = self.path_mut(path_id) else {
1994 continue;
1995 };
1996 path.challenges_sent.clear();
1997 path.send_new_challenge = false;
1998 debug!("new path validation failed");
1999 if let Err(err) = self.close_path(
2000 now,
2001 path_id,
2002 TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2003 ) {
2004 warn!(?err, "failed closing path");
2005 }
2006
2007 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2008 id: path_id,
2009 error: PathError::ValidationFailed,
2010 }));
2011 }
2012 PathTimer::Pacing => trace!("pacing timer expired"),
2013 PathTimer::MaxAckDelay => {
2014 trace!("max ack delay reached");
2015 self.spaces[SpaceId::Data]
2017 .for_path(path_id)
2018 .pending_acks
2019 .on_max_ack_delay_timeout()
2020 }
2021 PathTimer::DiscardPath => {
2022 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2025 if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2026 let (min_seq, max_seq) = loc_cid_state.active_seq();
2027 for seq in min_seq..=max_seq {
2028 self.endpoint_events.push_back(
2029 EndpointEventInner::RetireConnectionId(
2030 now, path_id, seq, false,
2031 ),
2032 );
2033 }
2034 }
2035 self.discard_path(path_id, now);
2036 }
2037 }
2038 }
2039 }
2040 }
2041 }
2042
2043 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2055 self.close_inner(
2056 now,
2057 Close::Application(frame::ApplicationClose { error_code, reason }),
2058 )
2059 }
2060
2061 fn close_inner(&mut self, now: Instant, reason: Close) {
2062 let was_closed = self.state.is_closed();
2063 if !was_closed {
2064 self.close_common();
2065 self.set_close_timer(now);
2066 self.close = true;
2067 self.state.move_to_closed_local(reason);
2068 }
2069 }
2070
2071 pub fn datagrams(&mut self) -> Datagrams<'_> {
2073 Datagrams { conn: self }
2074 }
2075
2076 pub fn stats(&mut self) -> ConnectionStats {
2078 self.stats.clone()
2079 }
2080
2081 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2083 let path = self.paths.get(&path_id)?;
2084 let stats = self.path_stats.entry(path_id).or_default();
2085 stats.rtt = path.data.rtt.get();
2086 stats.cwnd = path.data.congestion.window();
2087 stats.current_mtu = path.data.mtud.current_mtu();
2088 Some(*stats)
2089 }
2090
2091 pub fn ping(&mut self) {
2095 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2098 path_data.ping_pending = true;
2099 }
2100 }
2101
2102 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2106 let path_data = self.spaces[self.highest_space]
2107 .number_spaces
2108 .get_mut(&path)
2109 .ok_or(ClosedPath { _private: () })?;
2110 path_data.ping_pending = true;
2111 Ok(())
2112 }
2113
2114 pub fn force_key_update(&mut self) {
2118 if !self.state.is_established() {
2119 debug!("ignoring forced key update in illegal state");
2120 return;
2121 }
2122 if self.prev_crypto.is_some() {
2123 debug!("ignoring redundant forced key update");
2126 return;
2127 }
2128 self.update_keys(None, false);
2129 }
2130
2131 #[doc(hidden)]
2133 #[deprecated]
2134 pub fn initiate_key_update(&mut self) {
2135 self.force_key_update();
2136 }
2137
2138 pub fn crypto_session(&self) -> &dyn crypto::Session {
2140 &*self.crypto
2141 }
2142
2143 pub fn is_handshaking(&self) -> bool {
2148 self.state.is_handshake()
2149 }
2150
2151 pub fn is_closed(&self) -> bool {
2159 self.state.is_closed()
2160 }
2161
2162 pub fn is_drained(&self) -> bool {
2167 self.state.is_drained()
2168 }
2169
2170 pub fn accepted_0rtt(&self) -> bool {
2174 self.accepted_0rtt
2175 }
2176
2177 pub fn has_0rtt(&self) -> bool {
2179 self.zero_rtt_enabled
2180 }
2181
2182 pub fn has_pending_retransmits(&self) -> bool {
2184 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2185 }
2186
2187 pub fn side(&self) -> Side {
2189 self.side.side()
2190 }
2191
2192 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2194 self.path(path_id)
2195 .map(|path_data| {
2196 path_data
2197 .last_observed_addr_report
2198 .as_ref()
2199 .map(|observed| observed.socket_addr())
2200 })
2201 .ok_or(ClosedPath { _private: () })
2202 }
2203
2204 pub fn local_ip(&self) -> Option<IpAddr> {
2214 self.local_ip
2215 }
2216
2217 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2219 self.path(path_id).map(|d| d.rtt.get())
2220 }
2221
2222 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2224 self.path(path_id).map(|d| d.congestion.as_ref())
2225 }
2226
2227 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2232 self.streams.set_max_concurrent(dir, count);
2233 let pending = &mut self.spaces[SpaceId::Data].pending;
2236 self.streams.queue_max_stream_id(pending);
2237 }
2238
2239 pub fn set_max_concurrent_paths(
2249 &mut self,
2250 now: Instant,
2251 count: NonZeroU32,
2252 ) -> Result<(), MultipathNotNegotiated> {
2253 if !self.is_multipath_negotiated() {
2254 return Err(MultipathNotNegotiated { _private: () });
2255 }
2256 self.max_concurrent_paths = count;
2257
2258 let in_use_count = self
2259 .local_max_path_id
2260 .next()
2261 .saturating_sub(self.abandoned_paths.len() as u32)
2262 .as_u32();
2263 let extra_needed = count.get().saturating_sub(in_use_count);
2264 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2265
2266 self.set_max_path_id(now, new_max_path_id);
2267
2268 Ok(())
2269 }
2270
2271 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2273 if max_path_id <= self.local_max_path_id {
2274 return;
2275 }
2276
2277 self.local_max_path_id = max_path_id;
2278 self.spaces[SpaceId::Data].pending.max_path_id = true;
2279
2280 self.issue_first_path_cids(now);
2281 }
2282
2283 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2289 self.streams.max_concurrent(dir)
2290 }
2291
2292 pub fn set_send_window(&mut self, send_window: u64) {
2294 self.streams.set_send_window(send_window);
2295 }
2296
2297 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2299 if self.streams.set_receive_window(receive_window) {
2300 self.spaces[SpaceId::Data].pending.max_data = true;
2301 }
2302 }
2303
2304 pub fn is_multipath_negotiated(&self) -> bool {
2309 !self.is_handshaking()
2310 && self.config.max_concurrent_multipath_paths.is_some()
2311 && self.peer_params.initial_max_path_id.is_some()
2312 }
2313
2314 fn on_ack_received(
2315 &mut self,
2316 now: Instant,
2317 space: SpaceId,
2318 ack: frame::Ack,
2319 ) -> Result<(), TransportError> {
2320 let path = PathId::ZERO;
2322 self.inner_on_ack_received(now, space, path, ack)
2323 }
2324
2325 fn on_path_ack_received(
2326 &mut self,
2327 now: Instant,
2328 space: SpaceId,
2329 path_ack: frame::PathAck,
2330 ) -> Result<(), TransportError> {
2331 let (ack, path) = path_ack.into_ack();
2332 self.inner_on_ack_received(now, space, path, ack)
2333 }
2334
2335 fn inner_on_ack_received(
2337 &mut self,
2338 now: Instant,
2339 space: SpaceId,
2340 path: PathId,
2341 ack: frame::Ack,
2342 ) -> Result<(), TransportError> {
2343 if self.abandoned_paths.contains(&path) {
2344 trace!("silently ignoring PATH_ACK on abandoned path");
2347 return Ok(());
2348 }
2349 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2350 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2351 }
2352 let new_largest = {
2353 let space = &mut self.spaces[space].for_path(path);
2354 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2355 space.largest_acked_packet = Some(ack.largest);
2356 if let Some(info) = space.sent_packets.get(ack.largest) {
2357 space.largest_acked_packet_sent = info.time_sent;
2361 }
2362 true
2363 } else {
2364 false
2365 }
2366 };
2367
2368 if self.detect_spurious_loss(&ack, space, path) {
2369 self.path_data_mut(path)
2370 .congestion
2371 .on_spurious_congestion_event();
2372 }
2373
2374 let mut newly_acked = ArrayRangeSet::new();
2376 for range in ack.iter() {
2377 self.spaces[space].for_path(path).check_ack(range.clone())?;
2378 for (pn, _) in self.spaces[space]
2379 .for_path(path)
2380 .sent_packets
2381 .iter_range(range)
2382 {
2383 newly_acked.insert_one(pn);
2384 }
2385 }
2386
2387 if newly_acked.is_empty() {
2388 return Ok(());
2389 }
2390
2391 let mut ack_eliciting_acked = false;
2392 for packet in newly_acked.elts() {
2393 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2394 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2395 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2401 pns.pending_acks.subtract_below(*acked_pn);
2402 }
2403 }
2404 ack_eliciting_acked |= info.ack_eliciting;
2405
2406 let path_data = self.path_data_mut(path);
2408 let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2409 if mtu_updated {
2410 path_data
2411 .congestion
2412 .on_mtu_update(path_data.mtud.current_mtu());
2413 }
2414
2415 self.ack_frequency.on_acked(path, packet);
2417
2418 self.on_packet_acked(now, path, info);
2419 }
2420 }
2421
2422 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2423 let app_limited = self.app_limited;
2424 let path_data = self.path_data_mut(path);
2425 let in_flight = path_data.in_flight.bytes;
2426
2427 path_data
2428 .congestion
2429 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2430
2431 if new_largest && ack_eliciting_acked {
2432 let ack_delay = if space != SpaceId::Data {
2433 Duration::from_micros(0)
2434 } else {
2435 cmp::min(
2436 self.ack_frequency.peer_max_ack_delay,
2437 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2438 )
2439 };
2440 let rtt = now.saturating_duration_since(
2441 self.spaces[space].for_path(path).largest_acked_packet_sent,
2442 );
2443
2444 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2445 let path_data = self.path_data_mut(path);
2446 path_data.rtt.update(ack_delay, rtt);
2448 if path_data.first_packet_after_rtt_sample.is_none() {
2449 path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2450 }
2451 }
2452
2453 self.detect_lost_packets(now, space, path, true);
2455
2456 if self.peer_completed_address_validation(path) {
2457 self.path_data_mut(path).pto_count = 0;
2458 }
2459
2460 if self.path_data(path).sending_ecn {
2465 if let Some(ecn) = ack.ecn {
2466 if new_largest {
2471 let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2472 self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2473 }
2474 } else {
2475 debug!("ECN not acknowledged by peer");
2477 self.path_data_mut(path).sending_ecn = false;
2478 }
2479 }
2480
2481 self.set_loss_detection_timer(now, path);
2482 Ok(())
2483 }
2484
2485 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2486 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2487
2488 if lost_packets.is_empty() {
2489 return false;
2490 }
2491
2492 for range in ack.iter() {
2493 let spurious_losses: Vec<u64> = lost_packets
2494 .iter_range(range.clone())
2495 .map(|(pn, _info)| pn)
2496 .collect();
2497
2498 for pn in spurious_losses {
2499 lost_packets.remove(pn);
2500 }
2501 }
2502
2503 lost_packets.is_empty()
2508 }
2509
2510 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2515 let two_pto = 2 * self.path_data(path).rtt.pto_base();
2516
2517 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2518 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2519 }
2520
2521 fn process_ecn(
2523 &mut self,
2524 now: Instant,
2525 space: SpaceId,
2526 path: PathId,
2527 newly_acked: u64,
2528 ecn: frame::EcnCounts,
2529 largest_sent_time: Instant,
2530 ) {
2531 match self.spaces[space]
2532 .for_path(path)
2533 .detect_ecn(newly_acked, ecn)
2534 {
2535 Err(e) => {
2536 debug!("halting ECN due to verification failure: {}", e);
2537
2538 self.path_data_mut(path).sending_ecn = false;
2539 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2542 }
2543 Ok(false) => {}
2544 Ok(true) => {
2545 self.path_stats.entry(path).or_default().congestion_events += 1;
2546 self.path_data_mut(path).congestion.on_congestion_event(
2547 now,
2548 largest_sent_time,
2549 false,
2550 true,
2551 0,
2552 );
2553 }
2554 }
2555 }
2556
2557 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2560 self.paths
2561 .get_mut(&path_id)
2562 .expect("known path")
2563 .remove_in_flight(&info);
2564 let app_limited = self.app_limited;
2565 let path = self.path_data_mut(path_id);
2566 if info.ack_eliciting && !path.is_validating_path() {
2567 let rtt = path.rtt;
2570 path.congestion
2571 .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2572 }
2573
2574 if let Some(retransmits) = info.retransmits.get() {
2576 for (id, _) in retransmits.reset_stream.iter() {
2577 self.streams.reset_acked(*id);
2578 }
2579 }
2580
2581 for frame in info.stream_frames {
2582 self.streams.received_ack_of(frame);
2583 }
2584 }
2585
2586 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2587 let start = if self.zero_rtt_crypto.is_some() {
2588 now
2589 } else {
2590 self.prev_crypto
2591 .as_ref()
2592 .expect("no previous keys")
2593 .end_packet
2594 .as_ref()
2595 .expect("update not acknowledged yet")
2596 .1
2597 };
2598
2599 self.timers.set(
2601 Timer::Conn(ConnTimer::KeyDiscard),
2602 start + self.pto_max_path(space, false) * 3,
2603 self.qlog.with_time(now),
2604 );
2605 }
2606
2607 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2620 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2621 self.detect_lost_packets(now, pn_space, path_id, false);
2623 self.set_loss_detection_timer(now, path_id);
2624 return;
2625 }
2626
2627 let (_, space) = match self.pto_time_and_space(now, path_id) {
2628 Some(x) => x,
2629 None => {
2630 error!(%path_id, "PTO expired while unset");
2631 return;
2632 }
2633 };
2634 trace!(
2635 in_flight = self.path_data(path_id).in_flight.bytes,
2636 count = self.path_data(path_id).pto_count,
2637 ?space,
2638 %path_id,
2639 "PTO fired"
2640 );
2641
2642 let count = match self.path_data(path_id).in_flight.ack_eliciting {
2643 0 => {
2646 debug_assert!(!self.peer_completed_address_validation(path_id));
2647 1
2648 }
2649 _ => 2,
2651 };
2652 let pns = self.spaces[space].for_path(path_id);
2653 pns.loss_probes = pns.loss_probes.saturating_add(count);
2654 let path_data = self.path_data_mut(path_id);
2655 path_data.pto_count = path_data.pto_count.saturating_add(1);
2656 self.set_loss_detection_timer(now, path_id);
2657 }
2658
2659 fn detect_lost_packets(
2676 &mut self,
2677 now: Instant,
2678 pn_space: SpaceId,
2679 path_id: PathId,
2680 due_to_ack: bool,
2681 ) {
2682 let mut lost_packets = Vec::<u64>::new();
2683 let mut lost_mtu_probe = None;
2684 let mut in_persistent_congestion = false;
2685 let mut size_of_lost_packets = 0u64;
2686 self.spaces[pn_space].for_path(path_id).loss_time = None;
2687
2688 let path = self.path_data(path_id);
2691 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2692 let loss_delay = path
2693 .rtt
2694 .conservative()
2695 .mul_f32(self.config.time_threshold)
2696 .max(TIMER_GRANULARITY);
2697 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2698
2699 let largest_acked_packet = self.spaces[pn_space]
2700 .for_path(path_id)
2701 .largest_acked_packet
2702 .expect("detect_lost_packets only to be called if path received at least one ACK");
2703 let packet_threshold = self.config.packet_threshold as u64;
2704
2705 let congestion_period = self
2709 .pto(SpaceId::Data, path_id)
2710 .saturating_mul(self.config.persistent_congestion_threshold);
2711 let mut persistent_congestion_start: Option<Instant> = None;
2712 let mut prev_packet = None;
2713 let space = self.spaces[pn_space].for_path(path_id);
2714
2715 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2716 if prev_packet != Some(packet.wrapping_sub(1)) {
2717 persistent_congestion_start = None;
2719 }
2720
2721 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2725 if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2726 if Some(packet) == in_flight_mtu_probe {
2728 lost_mtu_probe = in_flight_mtu_probe;
2731 } else {
2732 lost_packets.push(packet);
2733 size_of_lost_packets += info.size as u64;
2734 if info.ack_eliciting && due_to_ack {
2735 match persistent_congestion_start {
2736 Some(start) if info.time_sent - start > congestion_period => {
2739 in_persistent_congestion = true;
2740 }
2741 None if first_packet_after_rtt_sample
2743 .is_some_and(|x| x < (pn_space, packet)) =>
2744 {
2745 persistent_congestion_start = Some(info.time_sent);
2746 }
2747 _ => {}
2748 }
2749 }
2750 }
2751 } else {
2752 if space.loss_time.is_none() {
2754 space.loss_time = Some(info.time_sent + loss_delay);
2757 }
2758 persistent_congestion_start = None;
2759 }
2760
2761 prev_packet = Some(packet);
2762 }
2763
2764 self.handle_lost_packets(
2765 pn_space,
2766 path_id,
2767 now,
2768 lost_packets,
2769 lost_mtu_probe,
2770 loss_delay,
2771 in_persistent_congestion,
2772 size_of_lost_packets,
2773 );
2774 }
2775
2776 fn discard_path(&mut self, path_id: PathId, now: Instant) {
2778 trace!(%path_id, "dropping path state");
2779 let path = self.path_data(path_id);
2780 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2781
2782 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2784 .for_path(path_id)
2785 .sent_packets
2786 .iter()
2787 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2788 .map(|(pn, info)| {
2789 size_of_lost_packets += info.size as u64;
2790 pn
2791 })
2792 .collect();
2793
2794 if !lost_pns.is_empty() {
2795 trace!(
2796 %path_id,
2797 count = lost_pns.len(),
2798 lost_bytes = size_of_lost_packets,
2799 "packets lost on path abandon"
2800 );
2801 self.handle_lost_packets(
2802 SpaceId::Data,
2803 path_id,
2804 now,
2805 lost_pns,
2806 in_flight_mtu_probe,
2807 Duration::ZERO,
2808 false,
2809 size_of_lost_packets,
2810 );
2811 }
2812 self.paths.remove(&path_id);
2813 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2814
2815 let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2816 self.events.push_back(
2817 PathEvent::Abandoned {
2818 id: path_id,
2819 path_stats,
2820 }
2821 .into(),
2822 );
2823 }
2824
2825 fn handle_lost_packets(
2826 &mut self,
2827 pn_space: SpaceId,
2828 path_id: PathId,
2829 now: Instant,
2830 lost_packets: Vec<u64>,
2831 lost_mtu_probe: Option<u64>,
2832 loss_delay: Duration,
2833 in_persistent_congestion: bool,
2834 size_of_lost_packets: u64,
2835 ) {
2836 debug_assert!(
2837 {
2838 let mut sorted = lost_packets.clone();
2839 sorted.sort();
2840 sorted == lost_packets
2841 },
2842 "lost_packets must be sorted"
2843 );
2844
2845 self.drain_lost_packets(now, pn_space, path_id);
2846
2847 if let Some(largest_lost) = lost_packets.last().cloned() {
2849 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2850 let largest_lost_sent = self.spaces[pn_space]
2851 .for_path(path_id)
2852 .sent_packets
2853 .get(largest_lost)
2854 .unwrap()
2855 .time_sent;
2856 let path_stats = self.path_stats.entry(path_id).or_default();
2857 path_stats.lost_packets += lost_packets.len() as u64;
2858 path_stats.lost_bytes += size_of_lost_packets;
2859 trace!(
2860 %path_id,
2861 count = lost_packets.len(),
2862 lost_bytes = size_of_lost_packets,
2863 "packets lost",
2864 );
2865
2866 for &packet in &lost_packets {
2867 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2868 continue;
2869 };
2870 self.qlog
2871 .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2872 self.paths
2873 .get_mut(&path_id)
2874 .unwrap()
2875 .remove_in_flight(&info);
2876
2877 for frame in info.stream_frames {
2878 self.streams.retransmit(frame);
2879 }
2880 self.spaces[pn_space].pending |= info.retransmits;
2881 self.path_data_mut(path_id)
2882 .mtud
2883 .on_non_probe_lost(packet, info.size);
2884
2885 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2886 packet,
2887 LostPacket {
2888 time_sent: info.time_sent,
2889 },
2890 );
2891 }
2892
2893 let path = self.path_data_mut(path_id);
2894 if path.mtud.black_hole_detected(now) {
2895 path.congestion.on_mtu_update(path.mtud.current_mtu());
2896 if let Some(max_datagram_size) = self.datagrams().max_size() {
2897 self.datagrams.drop_oversized(max_datagram_size);
2898 }
2899 self.path_stats
2900 .entry(path_id)
2901 .or_default()
2902 .black_holes_detected += 1;
2903 }
2904
2905 let lost_ack_eliciting =
2907 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2908
2909 if lost_ack_eliciting {
2910 self.path_stats
2911 .entry(path_id)
2912 .or_default()
2913 .congestion_events += 1;
2914 self.path_data_mut(path_id).congestion.on_congestion_event(
2915 now,
2916 largest_lost_sent,
2917 in_persistent_congestion,
2918 false,
2919 size_of_lost_packets,
2920 );
2921 }
2922 }
2923
2924 if let Some(packet) = lost_mtu_probe {
2926 let info = self.spaces[SpaceId::Data]
2927 .for_path(path_id)
2928 .take(packet)
2929 .unwrap(); self.paths
2932 .get_mut(&path_id)
2933 .unwrap()
2934 .remove_in_flight(&info);
2935 self.path_data_mut(path_id).mtud.on_probe_lost();
2936 self.path_stats
2937 .entry(path_id)
2938 .or_default()
2939 .lost_plpmtud_probes += 1;
2940 }
2941 }
2942
2943 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2949 SpaceId::iter()
2950 .filter_map(|id| {
2951 self.spaces[id]
2952 .number_spaces
2953 .get(&path_id)
2954 .and_then(|pns| pns.loss_time)
2955 .map(|time| (time, id))
2956 })
2957 .min_by_key(|&(time, _)| time)
2958 }
2959
2960 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2962 let path = self.path(path_id)?;
2963 let pto_count = path.pto_count;
2964 let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2965 let mut duration = path.rtt.pto_base() * backoff;
2966
2967 if path_id == PathId::ZERO
2968 && path.in_flight.ack_eliciting == 0
2969 && !self.peer_completed_address_validation(PathId::ZERO)
2970 {
2971 let space = match self.highest_space {
2977 SpaceId::Handshake => SpaceId::Handshake,
2978 _ => SpaceId::Initial,
2979 };
2980
2981 return Some((now + duration, space));
2982 }
2983
2984 let mut result = None;
2985 for space in SpaceId::iter() {
2986 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2987 continue;
2988 };
2989
2990 if !pns.has_in_flight() {
2991 continue;
2992 }
2993 if space == SpaceId::Data {
2994 if self.is_handshaking() {
2996 return result;
2997 }
2998 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
3000 }
3001 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3002 continue;
3003 };
3004 let pto = last_ack_eliciting + duration;
3005 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3006 if path.anti_amplification_blocked(1) {
3007 continue;
3009 }
3010 if path.in_flight.ack_eliciting == 0 {
3011 continue;
3013 }
3014 result = Some((pto, space));
3015 }
3016 }
3017 result
3018 }
3019
3020 fn peer_completed_address_validation(&self, path: PathId) -> bool {
3021 if self.side.is_server() || self.state.is_closed() {
3023 return true;
3024 }
3025 self.spaces[SpaceId::Handshake]
3028 .path_space(PathId::ZERO)
3029 .and_then(|pns| pns.largest_acked_packet)
3030 .is_some()
3031 || self.spaces[SpaceId::Data]
3032 .path_space(path)
3033 .and_then(|pns| pns.largest_acked_packet)
3034 .is_some()
3035 || (self.spaces[SpaceId::Data].crypto.is_some()
3036 && self.spaces[SpaceId::Handshake].crypto.is_none())
3037 }
3038
3039 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3047 if self.state.is_closed() {
3048 return;
3052 }
3053
3054 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3055 self.timers.set(
3057 Timer::PerPath(path_id, PathTimer::LossDetection),
3058 loss_time,
3059 self.qlog.with_time(now),
3060 );
3061 return;
3062 }
3063
3064 if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3067 self.timers.set(
3068 Timer::PerPath(path_id, PathTimer::LossDetection),
3069 timeout,
3070 self.qlog.with_time(now),
3071 );
3072 } else {
3073 self.timers.stop(
3074 Timer::PerPath(path_id, PathTimer::LossDetection),
3075 self.qlog.with_time(now),
3076 );
3077 }
3078 }
3079
3080 fn pto_max_path(&self, space: SpaceId, is_closing: bool) -> Duration {
3086 match space {
3087 SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3088 SpaceId::Data => self
3089 .paths
3090 .iter()
3091 .filter_map(|(path_id, state)| {
3092 if is_closing && state.data.total_sent == 0 && state.data.total_recvd == 0 {
3093 None
3095 } else {
3096 let pto = self.pto(space, *path_id);
3097 Some(pto)
3098 }
3099 })
3100 .max()
3101 .expect("there should be one at least path"),
3102 }
3103 }
3104
3105 fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3110 let max_ack_delay = match space {
3111 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3112 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3113 };
3114 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3115 }
3116
3117 fn on_packet_authenticated(
3118 &mut self,
3119 now: Instant,
3120 space_id: SpaceId,
3121 path_id: PathId,
3122 ecn: Option<EcnCodepoint>,
3123 packet: Option<u64>,
3124 spin: bool,
3125 is_1rtt: bool,
3126 ) {
3127 self.total_authed_packets += 1;
3128 if let Some(last_allowed_receive) = self
3129 .paths
3130 .get(&path_id)
3131 .and_then(|path| path.data.last_allowed_receive)
3132 {
3133 if now > last_allowed_receive {
3134 warn!("received data on path which we abandoned more than 3 * PTO ago");
3135 if !self.state.is_closed() {
3137 self.state.move_to_closed(TransportError::NO_ERROR(
3139 "peer failed to respond with PATH_ABANDON in time",
3140 ));
3141 self.close_common();
3142 self.set_close_timer(now);
3143 self.close = true;
3144 }
3145 return;
3146 }
3147 }
3148
3149 self.reset_keep_alive(path_id, now);
3150 self.reset_idle_timeout(now, space_id, path_id);
3151 self.permit_idle_reset = true;
3152 self.receiving_ecn |= ecn.is_some();
3153 if let Some(x) = ecn {
3154 let space = &mut self.spaces[space_id];
3155 space.for_path(path_id).ecn_counters += x;
3156
3157 if x.is_ce() {
3158 space
3159 .for_path(path_id)
3160 .pending_acks
3161 .set_immediate_ack_required();
3162 }
3163 }
3164
3165 let packet = match packet {
3166 Some(x) => x,
3167 None => return,
3168 };
3169 match &self.side {
3170 ConnectionSide::Client { .. } => {
3171 if space_id == SpaceId::Handshake {
3175 if let Some(hs) = self.state.as_handshake_mut() {
3176 hs.allow_server_migration = false;
3177 }
3178 }
3179 }
3180 ConnectionSide::Server { .. } => {
3181 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3182 {
3183 self.discard_space(now, SpaceId::Initial);
3185 }
3186 if self.zero_rtt_crypto.is_some() && is_1rtt {
3187 self.set_key_discard_timer(now, space_id)
3189 }
3190 }
3191 }
3192 let space = self.spaces[space_id].for_path(path_id);
3193 space.pending_acks.insert_one(packet, now);
3194 if packet >= space.rx_packet.unwrap_or_default() {
3195 space.rx_packet = Some(packet);
3196 self.spin = self.side.is_client() ^ spin;
3198 }
3199 }
3200
3201 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3206 if let Some(timeout) = self.idle_timeout {
3208 if self.state.is_closed() {
3209 self.timers
3210 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3211 } else {
3212 let dt = cmp::max(timeout, 3 * self.pto_max_path(space, false));
3213 self.timers.set(
3214 Timer::Conn(ConnTimer::Idle),
3215 now + dt,
3216 self.qlog.with_time(now),
3217 );
3218 }
3219 }
3220
3221 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3223 if self.state.is_closed() {
3224 self.timers.stop(
3225 Timer::PerPath(path_id, PathTimer::PathIdle),
3226 self.qlog.with_time(now),
3227 );
3228 } else {
3229 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3230 self.timers.set(
3231 Timer::PerPath(path_id, PathTimer::PathIdle),
3232 now + dt,
3233 self.qlog.with_time(now),
3234 );
3235 }
3236 }
3237 }
3238
3239 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3241 if !self.state.is_established() {
3242 return;
3243 }
3244
3245 if let Some(interval) = self.config.keep_alive_interval {
3246 self.timers.set(
3247 Timer::Conn(ConnTimer::KeepAlive),
3248 now + interval,
3249 self.qlog.with_time(now),
3250 );
3251 }
3252
3253 if let Some(interval) = self.path_data(path_id).keep_alive {
3254 self.timers.set(
3255 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3256 now + interval,
3257 self.qlog.with_time(now),
3258 );
3259 }
3260 }
3261
3262 fn reset_cid_retirement(&mut self, now: Instant) {
3264 if let Some((_path, t)) = self.next_cid_retirement() {
3265 self.timers.set(
3266 Timer::Conn(ConnTimer::PushNewCid),
3267 t,
3268 self.qlog.with_time(now),
3269 );
3270 }
3271 }
3272
3273 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3275 self.local_cid_state
3276 .iter()
3277 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3278 .min_by_key(|(_path_id, timeout)| *timeout)
3279 }
3280
3281 pub(crate) fn handle_first_packet(
3286 &mut self,
3287 now: Instant,
3288 remote: SocketAddr,
3289 ecn: Option<EcnCodepoint>,
3290 packet_number: u64,
3291 packet: InitialPacket,
3292 remaining: Option<BytesMut>,
3293 ) -> Result<(), ConnectionError> {
3294 let span = trace_span!("first recv");
3295 let _guard = span.enter();
3296 debug_assert!(self.side.is_server());
3297 let len = packet.header_data.len() + packet.payload.len();
3298 let path_id = PathId::ZERO;
3299 self.path_data_mut(path_id).total_recvd = len as u64;
3300
3301 if let Some(hs) = self.state.as_handshake_mut() {
3302 hs.expected_token = packet.header.token.clone();
3303 } else {
3304 unreachable!("first packet must be delivered in Handshake state");
3305 }
3306
3307 self.on_packet_authenticated(
3309 now,
3310 SpaceId::Initial,
3311 path_id,
3312 ecn,
3313 Some(packet_number),
3314 false,
3315 false,
3316 );
3317
3318 let packet: Packet = packet.into();
3319
3320 let mut qlog = QlogRecvPacket::new(len);
3321 qlog.header(&packet.header, Some(packet_number), path_id);
3322
3323 self.process_decrypted_packet(
3324 now,
3325 remote,
3326 path_id,
3327 Some(packet_number),
3328 packet,
3329 &mut qlog,
3330 )?;
3331 self.qlog.emit_packet_received(qlog, now);
3332 if let Some(data) = remaining {
3333 self.handle_coalesced(now, remote, path_id, ecn, data);
3334 }
3335
3336 self.qlog.emit_recovery_metrics(
3337 path_id,
3338 &mut self.paths.get_mut(&path_id).unwrap().data,
3339 now,
3340 );
3341
3342 Ok(())
3343 }
3344
3345 fn init_0rtt(&mut self, now: Instant) {
3346 let (header, packet) = match self.crypto.early_crypto() {
3347 Some(x) => x,
3348 None => return,
3349 };
3350 if self.side.is_client() {
3351 match self.crypto.transport_parameters() {
3352 Ok(params) => {
3353 let params = params
3354 .expect("crypto layer didn't supply transport parameters with ticket");
3355 let params = TransportParameters {
3357 initial_src_cid: None,
3358 original_dst_cid: None,
3359 preferred_address: None,
3360 retry_src_cid: None,
3361 stateless_reset_token: None,
3362 min_ack_delay: None,
3363 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3364 max_ack_delay: TransportParameters::default().max_ack_delay,
3365 initial_max_path_id: None,
3366 ..params
3367 };
3368 self.set_peer_params(params);
3369 self.qlog.emit_peer_transport_params_restored(self, now);
3370 }
3371 Err(e) => {
3372 error!("session ticket has malformed transport parameters: {}", e);
3373 return;
3374 }
3375 }
3376 }
3377 trace!("0-RTT enabled");
3378 self.zero_rtt_enabled = true;
3379 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3380 }
3381
3382 fn read_crypto(
3383 &mut self,
3384 space: SpaceId,
3385 crypto: &frame::Crypto,
3386 payload_len: usize,
3387 ) -> Result<(), TransportError> {
3388 let expected = if !self.state.is_handshake() {
3389 SpaceId::Data
3390 } else if self.highest_space == SpaceId::Initial {
3391 SpaceId::Initial
3392 } else {
3393 SpaceId::Handshake
3396 };
3397 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3401
3402 let end = crypto.offset + crypto.data.len() as u64;
3403 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3404 warn!(
3405 "received new {:?} CRYPTO data when expecting {:?}",
3406 space, expected
3407 );
3408 return Err(TransportError::PROTOCOL_VIOLATION(
3409 "new data at unexpected encryption level",
3410 ));
3411 }
3412
3413 let space = &mut self.spaces[space];
3414 let max = end.saturating_sub(space.crypto_stream.bytes_read());
3415 if max > self.config.crypto_buffer_size as u64 {
3416 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3417 }
3418
3419 space
3420 .crypto_stream
3421 .insert(crypto.offset, crypto.data.clone(), payload_len);
3422 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3423 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3424 if self.crypto.read_handshake(&chunk.bytes)? {
3425 self.events.push_back(Event::HandshakeDataReady);
3426 }
3427 }
3428
3429 Ok(())
3430 }
3431
3432 fn write_crypto(&mut self) {
3433 loop {
3434 let space = self.highest_space;
3435 let mut outgoing = Vec::new();
3436 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3437 match space {
3438 SpaceId::Initial => {
3439 self.upgrade_crypto(SpaceId::Handshake, crypto);
3440 }
3441 SpaceId::Handshake => {
3442 self.upgrade_crypto(SpaceId::Data, crypto);
3443 }
3444 _ => unreachable!("got updated secrets during 1-RTT"),
3445 }
3446 }
3447 if outgoing.is_empty() {
3448 if space == self.highest_space {
3449 break;
3450 } else {
3451 continue;
3453 }
3454 }
3455 let offset = self.spaces[space].crypto_offset;
3456 let outgoing = Bytes::from(outgoing);
3457 if let Some(hs) = self.state.as_handshake_mut() {
3458 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3459 hs.client_hello = Some(outgoing.clone());
3460 }
3461 }
3462 self.spaces[space].crypto_offset += outgoing.len() as u64;
3463 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3464 self.spaces[space].pending.crypto.push_back(frame::Crypto {
3465 offset,
3466 data: outgoing,
3467 });
3468 }
3469 }
3470
3471 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3473 debug_assert!(
3474 self.spaces[space].crypto.is_none(),
3475 "already reached packet space {space:?}"
3476 );
3477 trace!("{:?} keys ready", space);
3478 if space == SpaceId::Data {
3479 self.next_crypto = Some(
3481 self.crypto
3482 .next_1rtt_keys()
3483 .expect("handshake should be complete"),
3484 );
3485 }
3486
3487 self.spaces[space].crypto = Some(crypto);
3488 debug_assert!(space as usize > self.highest_space as usize);
3489 self.highest_space = space;
3490 if space == SpaceId::Data && self.side.is_client() {
3491 self.zero_rtt_crypto = None;
3493 }
3494 }
3495
3496 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3497 debug_assert!(space_id != SpaceId::Data);
3498 trace!("discarding {:?} keys", space_id);
3499 if space_id == SpaceId::Initial {
3500 if let ConnectionSide::Client { token, .. } = &mut self.side {
3502 *token = Bytes::new();
3503 }
3504 }
3505 let space = &mut self.spaces[space_id];
3506 space.crypto = None;
3507 let pns = space.for_path(PathId::ZERO);
3508 pns.time_of_last_ack_eliciting_packet = None;
3509 pns.loss_time = None;
3510 pns.loss_probes = 0;
3511 let sent_packets = mem::take(&mut pns.sent_packets);
3512 let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3513 for (_, packet) in sent_packets.into_iter() {
3514 path.data.remove_in_flight(&packet);
3515 }
3516
3517 self.set_loss_detection_timer(now, PathId::ZERO)
3518 }
3519
3520 fn handle_coalesced(
3521 &mut self,
3522 now: Instant,
3523 remote: SocketAddr,
3524 path_id: PathId,
3525 ecn: Option<EcnCodepoint>,
3526 data: BytesMut,
3527 ) {
3528 self.path_data_mut(path_id)
3529 .inc_total_recvd(data.len() as u64);
3530 let mut remaining = Some(data);
3531 let cid_len = self
3532 .local_cid_state
3533 .values()
3534 .map(|cid_state| cid_state.cid_len())
3535 .next()
3536 .expect("one cid_state must exist");
3537 while let Some(data) = remaining {
3538 match PartialDecode::new(
3539 data,
3540 &FixedLengthConnectionIdParser::new(cid_len),
3541 &[self.version],
3542 self.endpoint_config.grease_quic_bit,
3543 ) {
3544 Ok((partial_decode, rest)) => {
3545 remaining = rest;
3546 self.handle_decode(now, remote, path_id, ecn, partial_decode);
3547 }
3548 Err(e) => {
3549 trace!("malformed header: {}", e);
3550 return;
3551 }
3552 }
3553 }
3554 }
3555
3556 fn handle_decode(
3557 &mut self,
3558 now: Instant,
3559 remote: SocketAddr,
3560 path_id: PathId,
3561 ecn: Option<EcnCodepoint>,
3562 partial_decode: PartialDecode,
3563 ) {
3564 let qlog = QlogRecvPacket::new(partial_decode.len());
3565 if let Some(decoded) = packet_crypto::unprotect_header(
3566 partial_decode,
3567 &self.spaces,
3568 self.zero_rtt_crypto.as_ref(),
3569 self.peer_params.stateless_reset_token,
3570 ) {
3571 self.handle_packet(
3572 now,
3573 remote,
3574 path_id,
3575 ecn,
3576 decoded.packet,
3577 decoded.stateless_reset,
3578 qlog,
3579 );
3580 }
3581 }
3582
3583 fn handle_packet(
3584 &mut self,
3585 now: Instant,
3586 remote: SocketAddr,
3587 path_id: PathId,
3588 ecn: Option<EcnCodepoint>,
3589 packet: Option<Packet>,
3590 stateless_reset: bool,
3591 mut qlog: QlogRecvPacket,
3592 ) {
3593 self.stats.udp_rx.ios += 1;
3594 if let Some(ref packet) = packet {
3595 trace!(
3596 "got {:?} packet ({} bytes) from {} using id {}",
3597 packet.header.space(),
3598 packet.payload.len() + packet.header_data.len(),
3599 remote,
3600 packet.header.dst_cid(),
3601 );
3602 }
3603
3604 if self.is_handshaking() {
3605 if path_id != PathId::ZERO {
3606 debug!(%remote, %path_id, "discarding multipath packet during handshake");
3607 return;
3608 }
3609 if remote != self.path_data_mut(path_id).remote {
3610 if let Some(hs) = self.state.as_handshake() {
3611 if hs.allow_server_migration {
3612 trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote");
3613 self.path_data_mut(path_id).remote = remote;
3614 self.qlog.emit_tuple_assigned(path_id, remote, now);
3615 } else {
3616 debug!("discarding packet with unexpected remote during handshake");
3617 return;
3618 }
3619 } else {
3620 debug!("discarding packet with unexpected remote during handshake");
3621 return;
3622 }
3623 }
3624 }
3625
3626 let was_closed = self.state.is_closed();
3627 let was_drained = self.state.is_drained();
3628
3629 let decrypted = match packet {
3630 None => Err(None),
3631 Some(mut packet) => self
3632 .decrypt_packet(now, path_id, &mut packet)
3633 .map(move |number| (packet, number)),
3634 };
3635 let result = match decrypted {
3636 _ if stateless_reset => {
3637 debug!("got stateless reset");
3638 Err(ConnectionError::Reset)
3639 }
3640 Err(Some(e)) => {
3641 warn!("illegal packet: {}", e);
3642 Err(e.into())
3643 }
3644 Err(None) => {
3645 debug!("failed to authenticate packet");
3646 self.authentication_failures += 1;
3647 let integrity_limit = self.spaces[self.highest_space]
3648 .crypto
3649 .as_ref()
3650 .unwrap()
3651 .packet
3652 .local
3653 .integrity_limit();
3654 if self.authentication_failures > integrity_limit {
3655 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3656 } else {
3657 return;
3658 }
3659 }
3660 Ok((packet, number)) => {
3661 qlog.header(&packet.header, number, path_id);
3662 let span = match number {
3663 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3664 None => trace_span!("recv", space = ?packet.header.space()),
3665 };
3666 let _guard = span.enter();
3667
3668 let dedup = self.spaces[packet.header.space()]
3669 .path_space_mut(path_id)
3670 .map(|pns| &mut pns.dedup);
3671 if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3672 debug!("discarding possible duplicate packet");
3673 self.qlog.emit_packet_received(qlog, now);
3674 return;
3675 } else if self.state.is_handshake() && packet.header.is_short() {
3676 trace!("dropping short packet during handshake");
3678 self.qlog.emit_packet_received(qlog, now);
3679 return;
3680 } else {
3681 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3682 if let Some(hs) = self.state.as_handshake() {
3683 if self.side.is_server() && token != &hs.expected_token {
3684 warn!("discarding Initial with invalid retry token");
3688 self.qlog.emit_packet_received(qlog, now);
3689 return;
3690 }
3691 }
3692 }
3693
3694 if !self.state.is_closed() {
3695 let spin = match packet.header {
3696 Header::Short { spin, .. } => spin,
3697 _ => false,
3698 };
3699
3700 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3701 self.ensure_path(path_id, remote, now, number);
3703 }
3704 if self.paths.contains_key(&path_id) {
3705 self.on_packet_authenticated(
3706 now,
3707 packet.header.space(),
3708 path_id,
3709 ecn,
3710 number,
3711 spin,
3712 packet.header.is_1rtt(),
3713 );
3714 }
3715 }
3716
3717 let res = self
3718 .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog);
3719
3720 self.qlog.emit_packet_received(qlog, now);
3721 res
3722 }
3723 }
3724 };
3725
3726 if let Err(conn_err) = result {
3728 match conn_err {
3729 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3730 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3731 ConnectionError::Reset
3732 | ConnectionError::TransportError(TransportError {
3733 code: TransportErrorCode::AEAD_LIMIT_REACHED,
3734 ..
3735 }) => {
3736 self.state.move_to_drained(Some(conn_err));
3737 }
3738 ConnectionError::TimedOut => {
3739 unreachable!("timeouts aren't generated by packet processing");
3740 }
3741 ConnectionError::TransportError(err) => {
3742 debug!("closing connection due to transport error: {}", err);
3743 self.state.move_to_closed(err);
3744 }
3745 ConnectionError::VersionMismatch => {
3746 self.state.move_to_draining(Some(conn_err));
3747 }
3748 ConnectionError::LocallyClosed => {
3749 unreachable!("LocallyClosed isn't generated by packet processing");
3750 }
3751 ConnectionError::CidsExhausted => {
3752 unreachable!("CidsExhausted isn't generated by packet processing");
3753 }
3754 };
3755 }
3756
3757 if !was_closed && self.state.is_closed() {
3758 self.close_common();
3759 if !self.state.is_drained() {
3760 self.set_close_timer(now);
3761 }
3762 }
3763 if !was_drained && self.state.is_drained() {
3764 self.endpoint_events.push_back(EndpointEventInner::Drained);
3765 self.timers
3768 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3769 }
3770
3771 if matches!(self.state.as_type(), StateType::Closed) {
3773 let path_remote = self
3777 .paths
3778 .get(&path_id)
3779 .map(|p| p.data.remote)
3780 .unwrap_or(remote);
3781 self.close = remote == path_remote;
3782 }
3783 }
3784
3785 fn process_decrypted_packet(
3786 &mut self,
3787 now: Instant,
3788 remote: SocketAddr,
3789 path_id: PathId,
3790 number: Option<u64>,
3791 packet: Packet,
3792 qlog: &mut QlogRecvPacket,
3793 ) -> Result<(), ConnectionError> {
3794 if !self.paths.contains_key(&path_id) {
3795 trace!(%path_id, ?number, "discarding packet for unknown path");
3799 return Ok(());
3800 }
3801 let state = match self.state.as_type() {
3802 StateType::Established => {
3803 match packet.header.space() {
3804 SpaceId::Data => {
3805 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?
3806 }
3807 _ if packet.header.has_frames() => {
3808 self.process_early_payload(now, path_id, packet, qlog)?
3809 }
3810 _ => {
3811 trace!("discarding unexpected pre-handshake packet");
3812 }
3813 }
3814 return Ok(());
3815 }
3816 StateType::Closed => {
3817 for result in frame::Iter::new(packet.payload.freeze())? {
3818 let frame = match result {
3819 Ok(frame) => frame,
3820 Err(err) => {
3821 debug!("frame decoding error: {err:?}");
3822 continue;
3823 }
3824 };
3825 qlog.frame(&frame);
3826
3827 if let Frame::Padding = frame {
3828 continue;
3829 };
3830
3831 self.stats.frame_rx.record(&frame);
3832
3833 if let Frame::Close(_error) = frame {
3834 self.state.move_to_draining(None);
3835 break;
3836 }
3837 }
3838 return Ok(());
3839 }
3840 StateType::Draining | StateType::Drained => return Ok(()),
3841 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3842 };
3843
3844 match packet.header {
3845 Header::Retry {
3846 src_cid: rem_cid, ..
3847 } => {
3848 debug_assert_eq!(path_id, PathId::ZERO);
3849 if self.side.is_server() {
3850 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3851 }
3852
3853 let is_valid_retry = self
3854 .rem_cids
3855 .get(&path_id)
3856 .map(|cids| cids.active())
3857 .map(|orig_dst_cid| {
3858 self.crypto.is_valid_retry(
3859 orig_dst_cid,
3860 &packet.header_data,
3861 &packet.payload,
3862 )
3863 })
3864 .unwrap_or_default();
3865 if self.total_authed_packets > 1
3866 || packet.payload.len() <= 16 || !is_valid_retry
3868 {
3869 trace!("discarding invalid Retry");
3870 return Ok(());
3878 }
3879
3880 trace!("retrying with CID {}", rem_cid);
3881 let client_hello = state.client_hello.take().unwrap();
3882 self.retry_src_cid = Some(rem_cid);
3883 self.rem_cids
3884 .get_mut(&path_id)
3885 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3886 .update_initial_cid(rem_cid);
3887 self.rem_handshake_cid = rem_cid;
3888
3889 let space = &mut self.spaces[SpaceId::Initial];
3890 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3891 self.on_packet_acked(now, PathId::ZERO, info);
3892 };
3893
3894 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = {
3897 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3898 space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3899 space.crypto_offset = client_hello.len() as u64;
3900 space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3901 .for_path(path_id)
3902 .next_packet_number;
3903 space.pending.crypto.push_back(frame::Crypto {
3904 offset: 0,
3905 data: client_hello,
3906 });
3907 space
3908 };
3909
3910 let zero_rtt = mem::take(
3912 &mut self.spaces[SpaceId::Data]
3913 .for_path(PathId::ZERO)
3914 .sent_packets,
3915 );
3916 for (_, info) in zero_rtt.into_iter() {
3917 self.paths
3918 .get_mut(&PathId::ZERO)
3919 .unwrap()
3920 .remove_in_flight(&info);
3921 self.spaces[SpaceId::Data].pending |= info.retransmits;
3922 }
3923 self.streams.retransmit_all_for_0rtt();
3924
3925 let token_len = packet.payload.len() - 16;
3926 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3927 unreachable!("we already short-circuited if we're server");
3928 };
3929 *token = packet.payload.freeze().split_to(token_len);
3930
3931 self.state = State::handshake(state::Handshake {
3932 expected_token: Bytes::new(),
3933 rem_cid_set: false,
3934 client_hello: None,
3935 allow_server_migration: true,
3936 });
3937 Ok(())
3938 }
3939 Header::Long {
3940 ty: LongType::Handshake,
3941 src_cid: rem_cid,
3942 dst_cid: loc_cid,
3943 ..
3944 } => {
3945 debug_assert_eq!(path_id, PathId::ZERO);
3946 if rem_cid != self.rem_handshake_cid {
3947 debug!(
3948 "discarding packet with mismatched remote CID: {} != {}",
3949 self.rem_handshake_cid, rem_cid
3950 );
3951 return Ok(());
3952 }
3953 self.on_path_validated(path_id);
3954
3955 self.process_early_payload(now, path_id, packet, qlog)?;
3956 if self.state.is_closed() {
3957 return Ok(());
3958 }
3959
3960 if self.crypto.is_handshaking() {
3961 trace!("handshake ongoing");
3962 return Ok(());
3963 }
3964
3965 if self.side.is_client() {
3966 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3968 TransportError::new(
3969 TransportErrorCode::crypto(0x6d),
3970 "transport parameters missing".to_owned(),
3971 )
3972 })?;
3973
3974 if self.has_0rtt() {
3975 if !self.crypto.early_data_accepted().unwrap() {
3976 debug_assert!(self.side.is_client());
3977 debug!("0-RTT rejected");
3978 self.accepted_0rtt = false;
3979 self.streams.zero_rtt_rejected();
3980
3981 self.spaces[SpaceId::Data].pending = Retransmits::default();
3983
3984 let sent_packets = mem::take(
3986 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3987 );
3988 for (_, packet) in sent_packets.into_iter() {
3989 self.paths
3990 .get_mut(&path_id)
3991 .unwrap()
3992 .remove_in_flight(&packet);
3993 }
3994 } else {
3995 self.accepted_0rtt = true;
3996 params.validate_resumption_from(&self.peer_params)?;
3997 }
3998 }
3999 if let Some(token) = params.stateless_reset_token {
4000 let remote = self.path_data(path_id).remote;
4001 self.endpoint_events
4002 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4003 }
4004 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4005 self.issue_first_cids(now);
4006 } else {
4007 self.spaces[SpaceId::Data].pending.handshake_done = true;
4009 self.discard_space(now, SpaceId::Handshake);
4010 self.events.push_back(Event::HandshakeConfirmed);
4011 trace!("handshake confirmed");
4012 }
4013
4014 self.events.push_back(Event::Connected);
4015 self.state.move_to_established();
4016 trace!("established");
4017
4018 self.issue_first_path_cids(now);
4021 Ok(())
4022 }
4023 Header::Initial(InitialHeader {
4024 src_cid: rem_cid,
4025 dst_cid: loc_cid,
4026 ..
4027 }) => {
4028 debug_assert_eq!(path_id, PathId::ZERO);
4029 if !state.rem_cid_set {
4030 trace!("switching remote CID to {}", rem_cid);
4031 let mut state = state.clone();
4032 self.rem_cids
4033 .get_mut(&path_id)
4034 .expect("PathId::ZERO not yet abandoned")
4035 .update_initial_cid(rem_cid);
4036 self.rem_handshake_cid = rem_cid;
4037 self.orig_rem_cid = rem_cid;
4038 state.rem_cid_set = true;
4039 self.state.move_to_handshake(state);
4040 } else if rem_cid != self.rem_handshake_cid {
4041 debug!(
4042 "discarding packet with mismatched remote CID: {} != {}",
4043 self.rem_handshake_cid, rem_cid
4044 );
4045 return Ok(());
4046 }
4047
4048 let starting_space = self.highest_space;
4049 self.process_early_payload(now, path_id, packet, qlog)?;
4050
4051 if self.side.is_server()
4052 && starting_space == SpaceId::Initial
4053 && self.highest_space != SpaceId::Initial
4054 {
4055 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4056 TransportError::new(
4057 TransportErrorCode::crypto(0x6d),
4058 "transport parameters missing".to_owned(),
4059 )
4060 })?;
4061 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4062 self.issue_first_cids(now);
4063 self.init_0rtt(now);
4064 }
4065 Ok(())
4066 }
4067 Header::Long {
4068 ty: LongType::ZeroRtt,
4069 ..
4070 } => {
4071 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?;
4072 Ok(())
4073 }
4074 Header::VersionNegotiate { .. } => {
4075 if self.total_authed_packets > 1 {
4076 return Ok(());
4077 }
4078 let supported = packet
4079 .payload
4080 .chunks(4)
4081 .any(|x| match <[u8; 4]>::try_from(x) {
4082 Ok(version) => self.version == u32::from_be_bytes(version),
4083 Err(_) => false,
4084 });
4085 if supported {
4086 return Ok(());
4087 }
4088 debug!("remote doesn't support our version");
4089 Err(ConnectionError::VersionMismatch)
4090 }
4091 Header::Short { .. } => unreachable!(
4092 "short packets received during handshake are discarded in handle_packet"
4093 ),
4094 }
4095 }
4096
4097 fn process_early_payload(
4099 &mut self,
4100 now: Instant,
4101 path_id: PathId,
4102 packet: Packet,
4103 #[allow(unused)] qlog: &mut QlogRecvPacket,
4104 ) -> Result<(), TransportError> {
4105 debug_assert_ne!(packet.header.space(), SpaceId::Data);
4106 debug_assert_eq!(path_id, PathId::ZERO);
4107 let payload_len = packet.payload.len();
4108 let mut ack_eliciting = false;
4109 for result in frame::Iter::new(packet.payload.freeze())? {
4110 let frame = result?;
4111 qlog.frame(&frame);
4112 let span = match frame {
4113 Frame::Padding => continue,
4114 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4115 };
4116
4117 self.stats.frame_rx.record(&frame);
4118
4119 let _guard = span.as_ref().map(|x| x.enter());
4120 ack_eliciting |= frame.is_ack_eliciting();
4121
4122 if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4124 return Err(TransportError::PROTOCOL_VIOLATION(
4125 "illegal frame type in handshake",
4126 ));
4127 }
4128
4129 match frame {
4130 Frame::Padding | Frame::Ping => {}
4131 Frame::Crypto(frame) => {
4132 self.read_crypto(packet.header.space(), &frame, payload_len)?;
4133 }
4134 Frame::Ack(ack) => {
4135 self.on_ack_received(now, packet.header.space(), ack)?;
4136 }
4137 Frame::PathAck(ack) => {
4138 span.as_ref()
4139 .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4140 self.on_path_ack_received(now, packet.header.space(), ack)?;
4141 }
4142 Frame::Close(reason) => {
4143 self.state.move_to_draining(Some(reason.into()));
4144 return Ok(());
4145 }
4146 _ => {
4147 let mut err =
4148 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4149 err.frame = frame::MaybeFrame::Known(frame.ty());
4150 return Err(err);
4151 }
4152 }
4153 }
4154
4155 if ack_eliciting {
4156 self.spaces[packet.header.space()]
4158 .for_path(path_id)
4159 .pending_acks
4160 .set_immediate_ack_required();
4161 }
4162
4163 self.write_crypto();
4164 Ok(())
4165 }
4166
4167 fn process_payload(
4169 &mut self,
4170 now: Instant,
4171 remote: SocketAddr,
4172 path_id: PathId,
4173 number: u64,
4174 packet: Packet,
4175 #[allow(unused)] qlog: &mut QlogRecvPacket,
4176 ) -> Result<(), TransportError> {
4177 let payload = packet.payload.freeze();
4178 let mut is_probing_packet = true;
4179 let mut close = None;
4180 let payload_len = payload.len();
4181 let mut ack_eliciting = false;
4182 let mut migration_observed_addr = None;
4185 for result in frame::Iter::new(payload)? {
4186 let frame = result?;
4187 qlog.frame(&frame);
4188 let span = match frame {
4189 Frame::Padding => continue,
4190 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4191 };
4192
4193 self.stats.frame_rx.record(&frame);
4194 match &frame {
4197 Frame::Crypto(f) => {
4198 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4199 }
4200 Frame::Stream(f) => {
4201 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4202 }
4203 Frame::Datagram(f) => {
4204 trace!(len = f.data.len(), "got datagram frame");
4205 }
4206 f => {
4207 trace!("got frame {f}");
4208 }
4209 }
4210
4211 let _guard = span.enter();
4212 if packet.header.is_0rtt() {
4213 match frame {
4214 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4215 return Err(TransportError::PROTOCOL_VIOLATION(
4216 "illegal frame type in 0-RTT",
4217 ));
4218 }
4219 _ => {
4220 if frame.is_1rtt() {
4221 return Err(TransportError::PROTOCOL_VIOLATION(
4222 "illegal frame type in 0-RTT",
4223 ));
4224 }
4225 }
4226 }
4227 }
4228 ack_eliciting |= frame.is_ack_eliciting();
4229
4230 match frame {
4232 Frame::Padding
4233 | Frame::PathChallenge(_)
4234 | Frame::PathResponse(_)
4235 | Frame::NewConnectionId(_)
4236 | Frame::ObservedAddr(_) => {}
4237 _ => {
4238 is_probing_packet = false;
4239 }
4240 }
4241
4242 match frame {
4243 Frame::Crypto(frame) => {
4244 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4245 }
4246 Frame::Stream(frame) => {
4247 if self.streams.received(frame, payload_len)?.should_transmit() {
4248 self.spaces[SpaceId::Data].pending.max_data = true;
4249 }
4250 }
4251 Frame::Ack(ack) => {
4252 self.on_ack_received(now, SpaceId::Data, ack)?;
4253 }
4254 Frame::PathAck(ack) => {
4255 span.record("path", tracing::field::debug(&ack.path_id));
4256 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4257 }
4258 Frame::Padding | Frame::Ping => {}
4259 Frame::Close(reason) => {
4260 close = Some(reason);
4261 }
4262 Frame::PathChallenge(challenge) => {
4263 let path = &mut self
4264 .path_mut(path_id)
4265 .expect("payload is processed only after the path becomes known");
4266 path.path_responses.push(number, challenge.0, remote);
4267 if remote == path.remote {
4268 match self.peer_supports_ack_frequency() {
4278 true => self.immediate_ack(path_id),
4279 false => {
4280 self.ping_path(path_id).ok();
4281 }
4282 }
4283 }
4284 }
4285 Frame::PathResponse(response) => {
4286 let path = self
4287 .paths
4288 .get_mut(&path_id)
4289 .expect("payload is processed only after the path becomes known");
4290
4291 use PathTimer::*;
4292 use paths::OnPathResponseReceived::*;
4293 match path.data.on_path_response_received(now, response.0, remote) {
4294 OnPath { was_open } => {
4295 let qlog = self.qlog.with_time(now);
4296
4297 self.timers
4298 .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4299 self.timers
4300 .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4301
4302 let next_challenge = path
4303 .data
4304 .earliest_expiring_challenge()
4305 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4306 self.timers.set_or_stop(
4307 Timer::PerPath(path_id, PathChallengeLost),
4308 next_challenge,
4309 qlog,
4310 );
4311
4312 if !was_open {
4313 self.events
4314 .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4315 if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4316 {
4317 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4318 id: path_id,
4319 addr: observed.socket_addr(),
4320 }));
4321 }
4322 }
4323 if let Some((_, ref mut prev)) = path.prev {
4324 prev.challenges_sent.clear();
4325 prev.send_new_challenge = false;
4326 }
4327 }
4328 OffPath => {
4329 debug!("Response to off-path PathChallenge!");
4330 let next_challenge = path
4331 .data
4332 .earliest_expiring_challenge()
4333 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4334 self.timers.set_or_stop(
4335 Timer::PerPath(path_id, PathChallengeLost),
4336 next_challenge,
4337 self.qlog.with_time(now),
4338 );
4339 }
4340 Invalid { expected } => {
4341 debug!(%response, from=%remote, %expected, "ignoring invalid PATH_RESPONSE")
4342 }
4343 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4344 }
4345 }
4346 Frame::MaxData(bytes) => {
4347 self.streams.received_max_data(bytes);
4348 }
4349 Frame::MaxStreamData { id, offset } => {
4350 self.streams.received_max_stream_data(id, offset)?;
4351 }
4352 Frame::MaxStreams { dir, count } => {
4353 self.streams.received_max_streams(dir, count)?;
4354 }
4355 Frame::ResetStream(frame) => {
4356 if self.streams.received_reset(frame)?.should_transmit() {
4357 self.spaces[SpaceId::Data].pending.max_data = true;
4358 }
4359 }
4360 Frame::DataBlocked { offset } => {
4361 debug!(offset, "peer claims to be blocked at connection level");
4362 }
4363 Frame::StreamDataBlocked { id, offset } => {
4364 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4365 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4366 return Err(TransportError::STREAM_STATE_ERROR(
4367 "STREAM_DATA_BLOCKED on send-only stream",
4368 ));
4369 }
4370 debug!(
4371 stream = %id,
4372 offset, "peer claims to be blocked at stream level"
4373 );
4374 }
4375 Frame::StreamsBlocked { dir, limit } => {
4376 if limit > MAX_STREAM_COUNT {
4377 return Err(TransportError::FRAME_ENCODING_ERROR(
4378 "unrepresentable stream limit",
4379 ));
4380 }
4381 debug!(
4382 "peer claims to be blocked opening more than {} {} streams",
4383 limit, dir
4384 );
4385 }
4386 Frame::StopSending(frame::StopSending { id, error_code }) => {
4387 if id.initiator() != self.side.side() {
4388 if id.dir() == Dir::Uni {
4389 debug!("got STOP_SENDING on recv-only {}", id);
4390 return Err(TransportError::STREAM_STATE_ERROR(
4391 "STOP_SENDING on recv-only stream",
4392 ));
4393 }
4394 } else if self.streams.is_local_unopened(id) {
4395 return Err(TransportError::STREAM_STATE_ERROR(
4396 "STOP_SENDING on unopened stream",
4397 ));
4398 }
4399 self.streams.received_stop_sending(id, error_code);
4400 }
4401 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4402 if let Some(ref path_id) = path_id {
4403 span.record("path", tracing::field::debug(&path_id));
4404 }
4405 let path_id = path_id.unwrap_or_default();
4406 match self.local_cid_state.get_mut(&path_id) {
4407 None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4408 Some(cid_state) => {
4409 let allow_more_cids = cid_state
4410 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4411
4412 let has_path = !self.abandoned_paths.contains(&path_id);
4416 let allow_more_cids = allow_more_cids && has_path;
4417
4418 self.endpoint_events
4419 .push_back(EndpointEventInner::RetireConnectionId(
4420 now,
4421 path_id,
4422 sequence,
4423 allow_more_cids,
4424 ));
4425 }
4426 }
4427 }
4428 Frame::NewConnectionId(frame) => {
4429 let path_id = if let Some(path_id) = frame.path_id {
4430 if !self.is_multipath_negotiated() {
4431 return Err(TransportError::PROTOCOL_VIOLATION(
4432 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4433 ));
4434 }
4435 if path_id > self.local_max_path_id {
4436 return Err(TransportError::PROTOCOL_VIOLATION(
4437 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4438 ));
4439 }
4440 path_id
4441 } else {
4442 PathId::ZERO
4443 };
4444
4445 if self.abandoned_paths.contains(&path_id) {
4446 trace!("ignoring issued CID for abandoned path");
4447 continue;
4448 }
4449 if let Some(ref path_id) = frame.path_id {
4450 span.record("path", tracing::field::debug(&path_id));
4451 }
4452 let rem_cids = self
4453 .rem_cids
4454 .entry(path_id)
4455 .or_insert_with(|| CidQueue::new(frame.id));
4456 if rem_cids.active().is_empty() {
4457 return Err(TransportError::PROTOCOL_VIOLATION(
4458 "NEW_CONNECTION_ID when CIDs aren't in use",
4459 ));
4460 }
4461 if frame.retire_prior_to > frame.sequence {
4462 return Err(TransportError::PROTOCOL_VIOLATION(
4463 "NEW_CONNECTION_ID retiring unissued CIDs",
4464 ));
4465 }
4466
4467 use crate::cid_queue::InsertError;
4468 match rem_cids.insert(frame) {
4469 Ok(None) if self.path(path_id).is_none() => {
4470 self.continue_nat_traversal_round(now);
4473 }
4474 Ok(None) => {}
4475 Ok(Some((retired, reset_token))) => {
4476 let pending_retired =
4477 &mut self.spaces[SpaceId::Data].pending.retire_cids;
4478 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4481 if (pending_retired.len() as u64)
4484 .saturating_add(retired.end.saturating_sub(retired.start))
4485 > MAX_PENDING_RETIRED_CIDS
4486 {
4487 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4488 "queued too many retired CIDs",
4489 ));
4490 }
4491 pending_retired.extend(retired.map(|seq| (path_id, seq)));
4492 self.set_reset_token(path_id, remote, reset_token);
4493 }
4494 Err(InsertError::ExceedsLimit) => {
4495 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4496 }
4497 Err(InsertError::Retired) => {
4498 trace!("discarding already-retired");
4499 self.spaces[SpaceId::Data]
4503 .pending
4504 .retire_cids
4505 .push((path_id, frame.sequence));
4506 continue;
4507 }
4508 };
4509
4510 if self.side.is_server()
4511 && path_id == PathId::ZERO
4512 && self
4513 .rem_cids
4514 .get(&PathId::ZERO)
4515 .map(|cids| cids.active_seq() == 0)
4516 .unwrap_or_default()
4517 {
4518 self.update_rem_cid(PathId::ZERO);
4521 }
4522 }
4523 Frame::NewToken(NewToken { token }) => {
4524 let ConnectionSide::Client {
4525 token_store,
4526 server_name,
4527 ..
4528 } = &self.side
4529 else {
4530 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4531 };
4532 if token.is_empty() {
4533 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4534 }
4535 trace!("got new token");
4536 token_store.insert(server_name, token);
4537 }
4538 Frame::Datagram(datagram) => {
4539 if self
4540 .datagrams
4541 .received(datagram, &self.config.datagram_receive_buffer_size)?
4542 {
4543 self.events.push_back(Event::DatagramReceived);
4544 }
4545 }
4546 Frame::AckFrequency(ack_frequency) => {
4547 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4550 continue;
4553 }
4554
4555 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4557 space.pending_acks.set_ack_frequency_params(&ack_frequency);
4558
4559 if let Some(timeout) = space
4562 .pending_acks
4563 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4564 {
4565 self.timers.set(
4566 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4567 timeout,
4568 self.qlog.with_time(now),
4569 );
4570 }
4571 }
4572 }
4573 Frame::ImmediateAck => {
4574 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4576 pns.pending_acks.set_immediate_ack_required();
4577 }
4578 }
4579 Frame::HandshakeDone => {
4580 if self.side.is_server() {
4581 return Err(TransportError::PROTOCOL_VIOLATION(
4582 "client sent HANDSHAKE_DONE",
4583 ));
4584 }
4585 if self.spaces[SpaceId::Handshake].crypto.is_some() {
4586 self.discard_space(now, SpaceId::Handshake);
4587 }
4588 self.events.push_back(Event::HandshakeConfirmed);
4589 trace!("handshake confirmed");
4590 }
4591 Frame::ObservedAddr(observed) => {
4592 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4594 if !self
4595 .peer_params
4596 .address_discovery_role
4597 .should_report(&self.config.address_discovery_role)
4598 {
4599 return Err(TransportError::PROTOCOL_VIOLATION(
4600 "received OBSERVED_ADDRESS frame when not negotiated",
4601 ));
4602 }
4603 if packet.header.space() != SpaceId::Data {
4605 return Err(TransportError::PROTOCOL_VIOLATION(
4606 "OBSERVED_ADDRESS frame outside data space",
4607 ));
4608 }
4609
4610 let path = self.path_data_mut(path_id);
4611 if remote == path.remote {
4612 if let Some(updated) = path.update_observed_addr_report(observed) {
4613 if path.open {
4614 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4615 id: path_id,
4616 addr: updated,
4617 }));
4618 }
4619 }
4621 } else {
4622 migration_observed_addr = Some(observed)
4624 }
4625 }
4626 Frame::PathAbandon(frame::PathAbandon {
4627 path_id,
4628 error_code,
4629 }) => {
4630 span.record("path", tracing::field::debug(&path_id));
4631 let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4633 Ok(()) => {
4634 trace!("peer abandoned path");
4635 false
4636 }
4637 Err(ClosePathError::LastOpenPath) => {
4638 trace!("peer abandoned last path, closing connection");
4639 return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4641 }
4642 Err(ClosePathError::ClosedPath) => {
4643 trace!("peer abandoned already closed path");
4644 true
4645 }
4646 };
4647 if self.path(path_id).is_some() && !already_abandoned {
4652 let delay = self.pto(SpaceId::Data, path_id) * 3;
4657 self.timers.set(
4658 Timer::PerPath(path_id, PathTimer::DiscardPath),
4659 now + delay,
4660 self.qlog.with_time(now),
4661 );
4662 }
4663 }
4664 Frame::PathStatusAvailable(info) => {
4665 span.record("path", tracing::field::debug(&info.path_id));
4666 if self.is_multipath_negotiated() {
4667 self.on_path_status(
4668 info.path_id,
4669 PathStatus::Available,
4670 info.status_seq_no,
4671 );
4672 } else {
4673 return Err(TransportError::PROTOCOL_VIOLATION(
4674 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4675 ));
4676 }
4677 }
4678 Frame::PathStatusBackup(info) => {
4679 span.record("path", tracing::field::debug(&info.path_id));
4680 if self.is_multipath_negotiated() {
4681 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4682 } else {
4683 return Err(TransportError::PROTOCOL_VIOLATION(
4684 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4685 ));
4686 }
4687 }
4688 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4689 span.record("path", tracing::field::debug(&path_id));
4690 if !self.is_multipath_negotiated() {
4691 return Err(TransportError::PROTOCOL_VIOLATION(
4692 "received MAX_PATH_ID frame when multipath was not negotiated",
4693 ));
4694 }
4695 if path_id > self.remote_max_path_id {
4697 self.remote_max_path_id = path_id;
4698 self.issue_first_path_cids(now);
4699 while let Some(true) = self.continue_nat_traversal_round(now) {}
4700 }
4701 }
4702 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4703 if self.is_multipath_negotiated() {
4707 if self.local_max_path_id > max_path_id {
4708 return Err(TransportError::PROTOCOL_VIOLATION(
4709 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4710 ));
4711 }
4712 debug!("received PATHS_BLOCKED({:?})", max_path_id);
4713 } else {
4715 return Err(TransportError::PROTOCOL_VIOLATION(
4716 "received PATHS_BLOCKED frame when not multipath was not negotiated",
4717 ));
4718 }
4719 }
4720 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4721 if self.is_multipath_negotiated() {
4729 if path_id > self.local_max_path_id {
4730 return Err(TransportError::PROTOCOL_VIOLATION(
4731 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4732 ));
4733 }
4734 if next_seq.0
4735 > self
4736 .local_cid_state
4737 .get(&path_id)
4738 .map(|cid_state| cid_state.active_seq().1 + 1)
4739 .unwrap_or_default()
4740 {
4741 return Err(TransportError::PROTOCOL_VIOLATION(
4742 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4743 ));
4744 }
4745 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4746 } else {
4747 return Err(TransportError::PROTOCOL_VIOLATION(
4748 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4749 ));
4750 }
4751 }
4752 Frame::AddAddress(addr) => {
4753 let client_state = match self.iroh_hp.client_side_mut() {
4754 Ok(state) => state,
4755 Err(err) => {
4756 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4757 "Nat traversal(ADD_ADDRESS): {err}"
4758 )));
4759 }
4760 };
4761
4762 if !client_state.check_remote_address(&addr) {
4763 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4765 }
4766
4767 match client_state.add_remote_address(addr) {
4768 Ok(maybe_added) => {
4769 if let Some(added) = maybe_added {
4770 self.events.push_back(Event::NatTraversal(
4771 iroh_hp::Event::AddressAdded(added),
4772 ));
4773 }
4774 }
4775 Err(e) => {
4776 warn!(%e, "failed to add remote address")
4777 }
4778 }
4779 }
4780 Frame::RemoveAddress(addr) => {
4781 let client_state = match self.iroh_hp.client_side_mut() {
4782 Ok(state) => state,
4783 Err(err) => {
4784 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4785 "Nat traversal(REMOVE_ADDRESS): {err}"
4786 )));
4787 }
4788 };
4789 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4790 self.events
4791 .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4792 removed_addr,
4793 )));
4794 }
4795 }
4796 Frame::ReachOut(reach_out) => {
4797 let server_state = match self.iroh_hp.server_side_mut() {
4798 Ok(state) => state,
4799 Err(err) => {
4800 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4801 "Nat traversal(REACH_OUT): {err}"
4802 )));
4803 }
4804 };
4805
4806 if let Err(err) = server_state.handle_reach_out(reach_out) {
4807 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4808 "Nat traversal(REACH_OUT): {err}"
4809 )));
4810 }
4811 }
4812 }
4813 }
4814
4815 let space = self.spaces[SpaceId::Data].for_path(path_id);
4816 if space
4817 .pending_acks
4818 .packet_received(now, number, ack_eliciting, &space.dedup)
4819 {
4820 if self.abandoned_paths.contains(&path_id) {
4821 space.pending_acks.set_immediate_ack_required();
4824 } else {
4825 self.timers.set(
4826 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4827 now + self.ack_frequency.max_ack_delay,
4828 self.qlog.with_time(now),
4829 );
4830 }
4831 }
4832
4833 let pending = &mut self.spaces[SpaceId::Data].pending;
4838 self.streams.queue_max_stream_id(pending);
4839
4840 if let Some(reason) = close {
4841 self.state.move_to_draining(Some(reason.into()));
4842 self.close = true;
4843 }
4844
4845 if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4846 && !is_probing_packet
4847 && remote != self.path_data(path_id).remote
4848 {
4849 let ConnectionSide::Server { ref server_config } = self.side else {
4850 panic!("packets from unknown remote should be dropped by clients");
4851 };
4852 debug_assert!(
4853 server_config.migration,
4854 "migration-initiating packets should have been dropped immediately"
4855 );
4856 self.migrate(path_id, now, remote, migration_observed_addr);
4857 self.update_rem_cid(path_id);
4859 self.spin = false;
4860 }
4861
4862 Ok(())
4863 }
4864
4865 fn migrate(
4866 &mut self,
4867 path_id: PathId,
4868 now: Instant,
4869 remote: SocketAddr,
4870 observed_addr: Option<ObservedAddr>,
4871 ) {
4872 trace!(%remote, %path_id, "migration initiated");
4873 self.path_counter = self.path_counter.wrapping_add(1);
4874 let prev_pto = self.pto(SpaceId::Data, path_id);
4881 let known_path = self.paths.get_mut(&path_id).expect("known path");
4882 let path = &mut known_path.data;
4883 let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() {
4884 PathData::from_previous(remote, path, self.path_counter, now)
4885 } else {
4886 let peer_max_udp_payload_size =
4887 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4888 .unwrap_or(u16::MAX);
4889 PathData::new(
4890 remote,
4891 self.allow_mtud,
4892 Some(peer_max_udp_payload_size),
4893 self.path_counter,
4894 now,
4895 &self.config,
4896 )
4897 };
4898 new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4899 if let Some(report) = observed_addr {
4900 if let Some(updated) = new_path.update_observed_addr_report(report) {
4901 tracing::info!("adding observed addr event from migration");
4902 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4903 id: path_id,
4904 addr: updated,
4905 }));
4906 }
4907 }
4908 new_path.send_new_challenge = true;
4909
4910 let mut prev = mem::replace(path, new_path);
4911 if !prev.is_validating_path() {
4913 prev.send_new_challenge = true;
4914 known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4918 }
4919
4920 self.qlog.emit_tuple_assigned(path_id, remote, now);
4922
4923 self.timers.set(
4924 Timer::PerPath(path_id, PathTimer::PathValidation),
4925 now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4926 self.qlog.with_time(now),
4927 );
4928 }
4929
4930 pub fn local_address_changed(&mut self) {
4932 self.update_rem_cid(PathId::ZERO);
4934 self.ping();
4935 }
4936
4937 fn update_rem_cid(&mut self, path_id: PathId) {
4939 let Some((reset_token, retired)) =
4940 self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4941 else {
4942 return;
4943 };
4944
4945 self.spaces[SpaceId::Data]
4947 .pending
4948 .retire_cids
4949 .extend(retired.map(|seq| (path_id, seq)));
4950 let remote = self.path_data(path_id).remote;
4951 self.set_reset_token(path_id, remote, reset_token);
4952 }
4953
4954 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4963 self.endpoint_events
4964 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4965
4966 if path_id == PathId::ZERO {
4972 self.peer_params.stateless_reset_token = Some(reset_token);
4973 }
4974 }
4975
4976 fn issue_first_cids(&mut self, now: Instant) {
4978 if self
4979 .local_cid_state
4980 .get(&PathId::ZERO)
4981 .expect("PathId::ZERO exists when the connection is created")
4982 .cid_len()
4983 == 0
4984 {
4985 return;
4986 }
4987
4988 let mut n = self.peer_params.issue_cids_limit() - 1;
4990 if let ConnectionSide::Server { server_config } = &self.side {
4991 if server_config.has_preferred_address() {
4992 n -= 1;
4994 }
4995 }
4996 self.endpoint_events
4997 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
4998 }
4999
5000 fn issue_first_path_cids(&mut self, now: Instant) {
5004 if let Some(max_path_id) = self.max_path_id() {
5005 let mut path_id = self.max_path_id_with_cids.next();
5006 while path_id <= max_path_id {
5007 self.endpoint_events
5008 .push_back(EndpointEventInner::NeedIdentifiers(
5009 path_id,
5010 now,
5011 self.peer_params.issue_cids_limit(),
5012 ));
5013 path_id = path_id.next();
5014 }
5015 self.max_path_id_with_cids = max_path_id;
5016 }
5017 }
5018
5019 fn populate_packet(
5027 &mut self,
5028 now: Instant,
5029 space_id: SpaceId,
5030 path_id: PathId,
5031 path_exclusive_only: bool,
5032 buf: &mut impl BufMut,
5033 pn: u64,
5034 #[allow(unused)] qlog: &mut QlogSentPacket,
5035 ) -> SentFrames {
5036 let mut sent = SentFrames::default();
5037 let is_multipath_negotiated = self.is_multipath_negotiated();
5038 let space = &mut self.spaces[space_id];
5039 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5040 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5041 space
5042 .for_path(path_id)
5043 .pending_acks
5044 .maybe_ack_non_eliciting();
5045
5046 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5048 trace!("HANDSHAKE_DONE");
5049 buf.write(frame::FrameType::HandshakeDone);
5050 qlog.frame(&Frame::HandshakeDone);
5051 sent.retransmits.get_or_create().handshake_done = true;
5052 self.stats.frame_tx.handshake_done =
5054 self.stats.frame_tx.handshake_done.saturating_add(1);
5055 }
5056
5057 if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5060 while let Some(local_addr) = addresses.pop() {
5061 let reach_out = frame::ReachOut::new(*round, local_addr);
5062 if buf.remaining_mut() > reach_out.size() {
5063 trace!(%round, ?local_addr, "REACH_OUT");
5064 reach_out.encode(buf);
5065 let sent_reachouts = sent
5066 .retransmits
5067 .get_or_create()
5068 .reach_out
5069 .get_or_insert_with(|| (*round, Default::default()));
5070 sent_reachouts.1.push(local_addr);
5071 self.stats.frame_tx.reach_out = self.stats.frame_tx.reach_out.saturating_add(1);
5072 qlog.frame(&Frame::ReachOut(reach_out));
5073 } else {
5074 addresses.push(local_addr);
5075 break;
5076 }
5077 }
5078 if addresses.is_empty() {
5079 space.pending.reach_out = None;
5080 }
5081 }
5082
5083 if !path_exclusive_only
5085 && space_id == SpaceId::Data
5086 && self
5087 .config
5088 .address_discovery_role
5089 .should_report(&self.peer_params.address_discovery_role)
5090 && (!path.observed_addr_sent || space.pending.observed_addr)
5091 {
5092 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5093 if buf.remaining_mut() > frame.size() {
5094 trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5095 frame.encode(buf);
5096
5097 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5098 path.observed_addr_sent = true;
5099
5100 self.stats.frame_tx.observed_addr += 1;
5101 sent.retransmits.get_or_create().observed_addr = true;
5102 space.pending.observed_addr = false;
5103 qlog.frame(&Frame::ObservedAddr(frame));
5104 }
5105 }
5106
5107 if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5109 trace!("PING");
5110 buf.write(frame::FrameType::Ping);
5111 sent.non_retransmits = true;
5112 self.stats.frame_tx.ping += 1;
5113 qlog.frame(&Frame::Ping);
5114 }
5115
5116 if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5118 debug_assert_eq!(
5119 space_id,
5120 SpaceId::Data,
5121 "immediate acks must be sent in the data space"
5122 );
5123 trace!("IMMEDIATE_ACK");
5124 buf.write(frame::FrameType::ImmediateAck);
5125 sent.non_retransmits = true;
5126 self.stats.frame_tx.immediate_ack += 1;
5127 qlog.frame(&Frame::ImmediateAck);
5128 }
5129
5130 if !path_exclusive_only {
5134 for path_id in space
5135 .number_spaces
5136 .iter_mut()
5137 .filter(|(_, pns)| pns.pending_acks.can_send())
5138 .map(|(&path_id, _)| path_id)
5139 .collect::<Vec<_>>()
5140 {
5141 Self::populate_acks(
5142 now,
5143 self.receiving_ecn,
5144 &mut sent,
5145 path_id,
5146 space_id,
5147 space,
5148 is_multipath_negotiated,
5149 buf,
5150 &mut self.stats,
5151 qlog,
5152 );
5153 }
5154 }
5155
5156 if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5158 let sequence_number = self.ack_frequency.next_sequence_number();
5159
5160 let config = self.config.ack_frequency_config.as_ref().unwrap();
5162
5163 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5165 path.rtt.get(),
5166 config,
5167 &self.peer_params,
5168 );
5169
5170 trace!(?max_ack_delay, "ACK_FREQUENCY");
5171
5172 let frame = frame::AckFrequency {
5173 sequence: sequence_number,
5174 ack_eliciting_threshold: config.ack_eliciting_threshold,
5175 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5176 reordering_threshold: config.reordering_threshold,
5177 };
5178 frame.encode(buf);
5179 qlog.frame(&Frame::AckFrequency(frame));
5180
5181 sent.retransmits.get_or_create().ack_frequency = true;
5182
5183 self.ack_frequency
5184 .ack_frequency_sent(path_id, pn, max_ack_delay);
5185 self.stats.frame_tx.ack_frequency += 1;
5186 }
5187
5188 if buf.remaining_mut() > frame::PathChallenge::SIZE_BOUND
5190 && space_id == SpaceId::Data
5191 && path.send_new_challenge
5192 && !self.state.is_closed()
5193 {
5195 path.send_new_challenge = false;
5196
5197 let token = self.rng.random();
5199 let info = paths::SentChallengeInfo {
5200 sent_instant: now,
5201 remote: path.remote,
5202 };
5203 path.challenges_sent.insert(token, info);
5204 sent.non_retransmits = true;
5205 sent.requires_padding = true;
5206 let challenge = frame::PathChallenge(token);
5207 trace!(frame = %challenge);
5208 buf.write(challenge);
5209 qlog.frame(&Frame::PathChallenge(challenge));
5210 self.stats.frame_tx.path_challenge += 1;
5211 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5212 self.timers.set(
5213 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5214 now + pto,
5215 self.qlog.with_time(now),
5216 );
5217
5218 if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5219 space.pending.path_status.insert(path_id);
5221 }
5222
5223 if space_id == SpaceId::Data
5226 && self
5227 .config
5228 .address_discovery_role
5229 .should_report(&self.peer_params.address_discovery_role)
5230 {
5231 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5232 if buf.remaining_mut() > frame.size() {
5233 frame.encode(buf);
5234 qlog.frame(&Frame::ObservedAddr(frame));
5235
5236 self.next_observed_addr_seq_no =
5237 self.next_observed_addr_seq_no.saturating_add(1u8);
5238 path.observed_addr_sent = true;
5239
5240 self.stats.frame_tx.observed_addr += 1;
5241 sent.retransmits.get_or_create().observed_addr = true;
5242 space.pending.observed_addr = false;
5243 }
5244 }
5245 }
5246
5247 if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data {
5249 if let Some(token) = path.path_responses.pop_on_path(path.remote) {
5250 sent.non_retransmits = true;
5251 sent.requires_padding = true;
5252 let response = frame::PathResponse(token);
5253 trace!(frame = %response);
5254 buf.write(response);
5255 qlog.frame(&Frame::PathResponse(response));
5256 self.stats.frame_tx.path_response += 1;
5257
5258 if space_id == SpaceId::Data
5262 && self
5263 .config
5264 .address_discovery_role
5265 .should_report(&self.peer_params.address_discovery_role)
5266 {
5267 let frame =
5268 frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5269 if buf.remaining_mut() > frame.size() {
5270 frame.encode(buf);
5271 qlog.frame(&Frame::ObservedAddr(frame));
5272
5273 self.next_observed_addr_seq_no =
5274 self.next_observed_addr_seq_no.saturating_add(1u8);
5275 path.observed_addr_sent = true;
5276
5277 self.stats.frame_tx.observed_addr += 1;
5278 sent.retransmits.get_or_create().observed_addr = true;
5279 space.pending.observed_addr = false;
5280 }
5281 }
5282 }
5283 }
5284
5285 while !path_exclusive_only && buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt {
5287 let mut frame = match space.pending.crypto.pop_front() {
5288 Some(x) => x,
5289 None => break,
5290 };
5291
5292 let max_crypto_data_size = buf.remaining_mut()
5297 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5299 - 2; let len = frame
5302 .data
5303 .len()
5304 .min(2usize.pow(14) - 1)
5305 .min(max_crypto_data_size);
5306
5307 let data = frame.data.split_to(len);
5308 let truncated = frame::Crypto {
5309 offset: frame.offset,
5310 data,
5311 };
5312 trace!(
5313 "CRYPTO: off {} len {}",
5314 truncated.offset,
5315 truncated.data.len()
5316 );
5317 truncated.encode(buf);
5318 self.stats.frame_tx.crypto += 1;
5319
5320 #[cfg(feature = "qlog")]
5322 qlog.frame(&Frame::Crypto(truncated.clone()));
5323 sent.retransmits.get_or_create().crypto.push_back(truncated);
5324 if !frame.data.is_empty() {
5325 frame.offset += len as u64;
5326 space.pending.crypto.push_front(frame);
5327 }
5328 }
5329
5330 while !path_exclusive_only
5333 && space_id == SpaceId::Data
5334 && frame::PathAbandon::SIZE_BOUND <= buf.remaining_mut()
5335 {
5336 let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5337 break;
5338 };
5339 let frame = frame::PathAbandon {
5340 path_id,
5341 error_code,
5342 };
5343 frame.encode(buf);
5344 qlog.frame(&Frame::PathAbandon(frame));
5345 self.stats.frame_tx.path_abandon += 1;
5346 trace!(%path_id, "PATH_ABANDON");
5347 sent.retransmits
5348 .get_or_create()
5349 .path_abandon
5350 .entry(path_id)
5351 .or_insert(error_code);
5352 }
5353
5354 while !path_exclusive_only
5356 && space_id == SpaceId::Data
5357 && frame::PathStatusAvailable::SIZE_BOUND <= buf.remaining_mut()
5358 {
5359 let Some(path_id) = space.pending.path_status.pop_first() else {
5360 break;
5361 };
5362 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5363 trace!(%path_id, "discarding queued path status for unknown path");
5364 continue;
5365 };
5366
5367 let seq = path.status.seq();
5368 sent.retransmits.get_or_create().path_status.insert(path_id);
5369 match path.local_status() {
5370 PathStatus::Available => {
5371 let frame = frame::PathStatusAvailable {
5372 path_id,
5373 status_seq_no: seq,
5374 };
5375 frame.encode(buf);
5376 qlog.frame(&Frame::PathStatusAvailable(frame));
5377 self.stats.frame_tx.path_status_available += 1;
5378 trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5379 }
5380 PathStatus::Backup => {
5381 let frame = frame::PathStatusBackup {
5382 path_id,
5383 status_seq_no: seq,
5384 };
5385 frame.encode(buf);
5386 qlog.frame(&Frame::PathStatusBackup(frame));
5387 self.stats.frame_tx.path_status_backup += 1;
5388 trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5389 }
5390 }
5391 }
5392
5393 if space_id == SpaceId::Data
5395 && space.pending.max_path_id
5396 && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut()
5397 {
5398 let frame = frame::MaxPathId(self.local_max_path_id);
5399 frame.encode(buf);
5400 qlog.frame(&Frame::MaxPathId(frame));
5401 space.pending.max_path_id = false;
5402 sent.retransmits.get_or_create().max_path_id = true;
5403 trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5404 self.stats.frame_tx.max_path_id += 1;
5405 }
5406
5407 if space_id == SpaceId::Data
5409 && space.pending.paths_blocked
5410 && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut()
5411 {
5412 let frame = frame::PathsBlocked(self.remote_max_path_id);
5413 frame.encode(buf);
5414 qlog.frame(&Frame::PathsBlocked(frame));
5415 space.pending.paths_blocked = false;
5416 sent.retransmits.get_or_create().paths_blocked = true;
5417 trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED");
5418 self.stats.frame_tx.paths_blocked += 1;
5419 }
5420
5421 while space_id == SpaceId::Data && frame::PathCidsBlocked::SIZE_BOUND <= buf.remaining_mut()
5423 {
5424 let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5425 break;
5426 };
5427 let next_seq = match self.rem_cids.get(&path_id) {
5428 Some(cid_queue) => cid_queue.active_seq() + 1,
5429 None => 0,
5430 };
5431 let frame = frame::PathCidsBlocked {
5432 path_id,
5433 next_seq: VarInt(next_seq),
5434 };
5435 frame.encode(buf);
5436 qlog.frame(&Frame::PathCidsBlocked(frame));
5437 sent.retransmits
5438 .get_or_create()
5439 .path_cids_blocked
5440 .push(path_id);
5441 trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5442 self.stats.frame_tx.path_cids_blocked += 1;
5443 }
5444
5445 if space_id == SpaceId::Data {
5447 self.streams.write_control_frames(
5448 buf,
5449 &mut space.pending,
5450 &mut sent.retransmits,
5451 &mut self.stats.frame_tx,
5452 qlog,
5453 );
5454 }
5455
5456 let cid_len = self
5458 .local_cid_state
5459 .values()
5460 .map(|cid_state| cid_state.cid_len())
5461 .max()
5462 .expect("some local CID state must exist");
5463 let new_cid_size_bound =
5464 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5465 while !path_exclusive_only && buf.remaining_mut() > new_cid_size_bound {
5466 let issued = match space.pending.new_cids.pop() {
5467 Some(x) => x,
5468 None => break,
5469 };
5470 let retire_prior_to = self
5471 .local_cid_state
5472 .get(&issued.path_id)
5473 .map(|cid_state| cid_state.retire_prior_to())
5474 .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5475
5476 let cid_path_id = match is_multipath_negotiated {
5477 true => {
5478 trace!(
5479 path_id = ?issued.path_id,
5480 sequence = issued.sequence,
5481 id = %issued.id,
5482 "PATH_NEW_CONNECTION_ID",
5483 );
5484 self.stats.frame_tx.path_new_connection_id += 1;
5485 Some(issued.path_id)
5486 }
5487 false => {
5488 trace!(
5489 sequence = issued.sequence,
5490 id = %issued.id,
5491 "NEW_CONNECTION_ID"
5492 );
5493 debug_assert_eq!(issued.path_id, PathId::ZERO);
5494 self.stats.frame_tx.new_connection_id += 1;
5495 None
5496 }
5497 };
5498 let frame = frame::NewConnectionId {
5499 path_id: cid_path_id,
5500 sequence: issued.sequence,
5501 retire_prior_to,
5502 id: issued.id,
5503 reset_token: issued.reset_token,
5504 };
5505 frame.encode(buf);
5506 sent.retransmits.get_or_create().new_cids.push(issued);
5507 qlog.frame(&Frame::NewConnectionId(frame));
5508 }
5509
5510 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5512 while !path_exclusive_only && buf.remaining_mut() > retire_cid_bound {
5513 let (path_id, sequence) = match space.pending.retire_cids.pop() {
5514 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5515 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5516 self.stats.frame_tx.retire_connection_id += 1;
5517 (None, seq)
5518 }
5519 Some((path_id, seq)) => {
5520 trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5521 self.stats.frame_tx.path_retire_connection_id += 1;
5522 (Some(path_id), seq)
5523 }
5524 None => break,
5525 };
5526 let frame = frame::RetireConnectionId { path_id, sequence };
5527 frame.encode(buf);
5528 qlog.frame(&Frame::RetireConnectionId(frame));
5529 sent.retransmits
5530 .get_or_create()
5531 .retire_cids
5532 .push((path_id.unwrap_or_default(), sequence));
5533 }
5534
5535 let mut sent_datagrams = false;
5537 while !path_exclusive_only
5538 && buf.remaining_mut() > Datagram::SIZE_BOUND
5539 && space_id == SpaceId::Data
5540 {
5541 let prev_remaining = buf.remaining_mut();
5542 match self.datagrams.write(buf) {
5543 true => {
5544 sent_datagrams = true;
5545 sent.non_retransmits = true;
5546 self.stats.frame_tx.datagram += 1;
5547 qlog.frame_datagram((prev_remaining - buf.remaining_mut()) as u64);
5548 }
5549 false => break,
5550 }
5551 }
5552 if self.datagrams.send_blocked && sent_datagrams {
5553 self.events.push_back(Event::DatagramsUnblocked);
5554 self.datagrams.send_blocked = false;
5555 }
5556
5557 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5558
5559 while let Some(remote_addr) = space.pending.new_tokens.pop() {
5561 if path_exclusive_only {
5562 break;
5563 }
5564 debug_assert_eq!(space_id, SpaceId::Data);
5565 let ConnectionSide::Server { server_config } = &self.side else {
5566 panic!("NEW_TOKEN frames should not be enqueued by clients");
5567 };
5568
5569 if remote_addr != path.remote {
5570 continue;
5575 }
5576
5577 let token = Token::new(
5578 TokenPayload::Validation {
5579 ip: remote_addr.ip(),
5580 issued: server_config.time_source.now(),
5581 },
5582 &mut self.rng,
5583 );
5584 let new_token = NewToken {
5585 token: token.encode(&*server_config.token_key).into(),
5586 };
5587
5588 if buf.remaining_mut() < new_token.size() {
5589 space.pending.new_tokens.push(remote_addr);
5590 break;
5591 }
5592
5593 trace!("NEW_TOKEN");
5594 new_token.encode(buf);
5595 qlog.frame(&Frame::NewToken(new_token));
5596 sent.retransmits
5597 .get_or_create()
5598 .new_tokens
5599 .push(remote_addr);
5600 self.stats.frame_tx.new_token += 1;
5601 }
5602
5603 if !path_exclusive_only && space_id == SpaceId::Data {
5605 sent.stream_frames =
5606 self.streams
5607 .write_stream_frames(buf, self.config.send_fairness, qlog);
5608 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
5609 }
5610
5611 while space_id == SpaceId::Data && frame::AddAddress::SIZE_BOUND <= buf.remaining_mut() {
5614 if let Some(added_address) = space.pending.add_address.pop_last() {
5615 trace!(
5616 seq = %added_address.seq_no,
5617 ip = ?added_address.ip,
5618 port = added_address.port,
5619 "ADD_ADDRESS",
5620 );
5621 added_address.encode(buf);
5622 sent.retransmits
5623 .get_or_create()
5624 .add_address
5625 .insert(added_address);
5626 self.stats.frame_tx.add_address = self.stats.frame_tx.add_address.saturating_add(1);
5627 qlog.frame(&Frame::AddAddress(added_address));
5628 } else {
5629 break;
5630 }
5631 }
5632
5633 while space_id == SpaceId::Data && frame::RemoveAddress::SIZE_BOUND <= buf.remaining_mut() {
5635 if let Some(removed_address) = space.pending.remove_address.pop_last() {
5636 trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5637 removed_address.encode(buf);
5638 sent.retransmits
5639 .get_or_create()
5640 .remove_address
5641 .insert(removed_address);
5642 self.stats.frame_tx.remove_address =
5643 self.stats.frame_tx.remove_address.saturating_add(1);
5644 qlog.frame(&Frame::RemoveAddress(removed_address));
5645 } else {
5646 break;
5647 }
5648 }
5649
5650 sent
5651 }
5652
5653 fn populate_acks(
5655 now: Instant,
5656 receiving_ecn: bool,
5657 sent: &mut SentFrames,
5658 path_id: PathId,
5659 space_id: SpaceId,
5660 space: &mut PacketSpace,
5661 is_multipath_negotiated: bool,
5662 buf: &mut impl BufMut,
5663 stats: &mut ConnectionStats,
5664 #[allow(unused)] qlog: &mut QlogSentPacket,
5665 ) {
5666 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5668
5669 debug_assert!(
5670 is_multipath_negotiated || path_id == PathId::ZERO,
5671 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5672 );
5673 if is_multipath_negotiated {
5674 debug_assert!(
5675 space_id == SpaceId::Data || path_id == PathId::ZERO,
5676 "path acks must be sent in 1RTT space (have {space_id:?})"
5677 );
5678 }
5679
5680 let pns = space.for_path(path_id);
5681 let ranges = pns.pending_acks.ranges();
5682 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5683 let ecn = if receiving_ecn {
5684 Some(&pns.ecn_counters)
5685 } else {
5686 None
5687 };
5688 if let Some(max) = ranges.max() {
5689 sent.largest_acked.insert(path_id, max);
5690 }
5691
5692 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5693 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5695 let delay = delay_micros >> ack_delay_exp.into_inner();
5696
5697 if is_multipath_negotiated && space_id == SpaceId::Data {
5698 if !ranges.is_empty() {
5699 trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5700 frame::PathAck::encoder(path_id, delay as _, ranges, ecn).encode(buf);
5701 qlog.frame_path_ack(path_id, delay as _, ranges, ecn);
5702 stats.frame_tx.path_acks += 1;
5703 }
5704 } else {
5705 trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5706 frame::Ack::encoder(delay as _, ranges, ecn).encode(buf);
5707 stats.frame_tx.acks += 1;
5708 qlog.frame_ack(delay, ranges, ecn);
5709 }
5710 }
5711
5712 fn close_common(&mut self) {
5713 trace!("connection closed");
5714 self.timers.reset();
5715 }
5716
5717 fn set_close_timer(&mut self, now: Instant) {
5718 let pto_max = self.pto_max_path(self.highest_space, true);
5721 self.timers.set(
5722 Timer::Conn(ConnTimer::Close),
5723 now + 3 * pto_max,
5724 self.qlog.with_time(now),
5725 );
5726 }
5727
5728 fn handle_peer_params(
5733 &mut self,
5734 params: TransportParameters,
5735 loc_cid: ConnectionId,
5736 rem_cid: ConnectionId,
5737 now: Instant,
5738 ) -> Result<(), TransportError> {
5739 if Some(self.orig_rem_cid) != params.initial_src_cid
5740 || (self.side.is_client()
5741 && (Some(self.initial_dst_cid) != params.original_dst_cid
5742 || self.retry_src_cid != params.retry_src_cid))
5743 {
5744 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5745 "CID authentication failure",
5746 ));
5747 }
5748 if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5749 return Err(TransportError::PROTOCOL_VIOLATION(
5750 "multipath must not use zero-length CIDs",
5751 ));
5752 }
5753
5754 self.set_peer_params(params);
5755 self.qlog.emit_peer_transport_params_received(self, now);
5756
5757 Ok(())
5758 }
5759
5760 fn set_peer_params(&mut self, params: TransportParameters) {
5761 self.streams.set_params(¶ms);
5762 self.idle_timeout =
5763 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5764 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5765
5766 if let Some(ref info) = params.preferred_address {
5767 self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5769 path_id: None,
5770 sequence: 1,
5771 id: info.connection_id,
5772 reset_token: info.stateless_reset_token,
5773 retire_prior_to: 0,
5774 })
5775 .expect(
5776 "preferred address CID is the first received, and hence is guaranteed to be legal",
5777 );
5778 let remote = self.path_data(PathId::ZERO).remote;
5779 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5780 }
5781 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
5782
5783 let mut multipath_enabled = None;
5784 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5785 self.config.get_initial_max_path_id(),
5786 params.initial_max_path_id,
5787 ) {
5788 self.local_max_path_id = local_max_path_id;
5790 self.remote_max_path_id = remote_max_path_id;
5791 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5792 debug!(%initial_max_path_id, "multipath negotiated");
5793 multipath_enabled = Some(initial_max_path_id);
5794 }
5795
5796 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5797 self.config
5798 .max_remote_nat_traversal_addresses
5799 .zip(params.max_remote_nat_traversal_addresses)
5800 {
5801 if let Some(max_initial_paths) =
5802 multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5803 {
5804 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5805 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5806 self.iroh_hp =
5807 iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5808 debug!(
5809 %max_remote_addresses, %max_local_addresses,
5810 "iroh hole punching negotiated"
5811 );
5812
5813 match self.side() {
5814 Side::Client => {
5815 if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5816 warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5819 } else if max_local_addresses as u64
5820 > params.active_connection_id_limit.into_inner()
5821 {
5822 warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5826 }
5827 }
5828 Side::Server => {
5829 if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5830 warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5831 }
5832 }
5833 }
5834 } else {
5835 debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5836 }
5837 }
5838
5839 self.peer_params = params;
5840 let peer_max_udp_payload_size =
5841 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5842 self.path_data_mut(PathId::ZERO)
5843 .mtud
5844 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5845 }
5846
5847 fn decrypt_packet(
5849 &mut self,
5850 now: Instant,
5851 path_id: PathId,
5852 packet: &mut Packet,
5853 ) -> Result<Option<u64>, Option<TransportError>> {
5854 let result = packet_crypto::decrypt_packet_body(
5855 packet,
5856 path_id,
5857 &self.spaces,
5858 self.zero_rtt_crypto.as_ref(),
5859 self.key_phase,
5860 self.prev_crypto.as_ref(),
5861 self.next_crypto.as_ref(),
5862 )?;
5863
5864 let result = match result {
5865 Some(r) => r,
5866 None => return Ok(None),
5867 };
5868
5869 if result.outgoing_key_update_acked {
5870 if let Some(prev) = self.prev_crypto.as_mut() {
5871 prev.end_packet = Some((result.number, now));
5872 self.set_key_discard_timer(now, packet.header.space());
5873 }
5874 }
5875
5876 if result.incoming_key_update {
5877 trace!("key update authenticated");
5878 self.update_keys(Some((result.number, now)), true);
5879 self.set_key_discard_timer(now, packet.header.space());
5880 }
5881
5882 Ok(Some(result.number))
5883 }
5884
5885 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5886 trace!("executing key update");
5887 let new = self
5891 .crypto
5892 .next_1rtt_keys()
5893 .expect("only called for `Data` packets");
5894 self.key_phase_size = new
5895 .local
5896 .confidentiality_limit()
5897 .saturating_sub(KEY_UPDATE_MARGIN);
5898 let old = mem::replace(
5899 &mut self.spaces[SpaceId::Data]
5900 .crypto
5901 .as_mut()
5902 .unwrap() .packet,
5904 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5905 );
5906 self.spaces[SpaceId::Data]
5907 .iter_paths_mut()
5908 .for_each(|s| s.sent_with_keys = 0);
5909 self.prev_crypto = Some(PrevCrypto {
5910 crypto: old,
5911 end_packet,
5912 update_unacked: remote,
5913 });
5914 self.key_phase = !self.key_phase;
5915 }
5916
5917 fn peer_supports_ack_frequency(&self) -> bool {
5918 self.peer_params.min_ack_delay.is_some()
5919 }
5920
5921 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5926 debug_assert_eq!(
5927 self.highest_space,
5928 SpaceId::Data,
5929 "immediate ack must be written in the data space"
5930 );
5931 self.spaces[self.highest_space]
5932 .for_path(path_id)
5933 .immediate_ack_pending = true;
5934 }
5935
5936 #[cfg(test)]
5938 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5939 let (path_id, first_decode, remaining) = match &event.0 {
5940 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5941 path_id,
5942 first_decode,
5943 remaining,
5944 ..
5945 }) => (path_id, first_decode, remaining),
5946 _ => return None,
5947 };
5948
5949 if remaining.is_some() {
5950 panic!("Packets should never be coalesced in tests");
5951 }
5952
5953 let decrypted_header = packet_crypto::unprotect_header(
5954 first_decode.clone(),
5955 &self.spaces,
5956 self.zero_rtt_crypto.as_ref(),
5957 self.peer_params.stateless_reset_token,
5958 )?;
5959
5960 let mut packet = decrypted_header.packet?;
5961 packet_crypto::decrypt_packet_body(
5962 &mut packet,
5963 *path_id,
5964 &self.spaces,
5965 self.zero_rtt_crypto.as_ref(),
5966 self.key_phase,
5967 self.prev_crypto.as_ref(),
5968 self.next_crypto.as_ref(),
5969 )
5970 .ok()?;
5971
5972 Some(packet.payload.to_vec())
5973 }
5974
5975 #[cfg(test)]
5978 pub(crate) fn bytes_in_flight(&self) -> u64 {
5979 self.path_data(PathId::ZERO).in_flight.bytes
5981 }
5982
5983 #[cfg(test)]
5985 pub(crate) fn congestion_window(&self) -> u64 {
5986 let path = self.path_data(PathId::ZERO);
5987 path.congestion
5988 .window()
5989 .saturating_sub(path.in_flight.bytes)
5990 }
5991
5992 #[cfg(test)]
5994 pub(crate) fn is_idle(&self) -> bool {
5995 let current_timers = self.timers.values();
5996 current_timers
5997 .into_iter()
5998 .filter(|(timer, _)| {
5999 !matches!(
6000 timer,
6001 Timer::Conn(ConnTimer::KeepAlive)
6002 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6003 | Timer::Conn(ConnTimer::PushNewCid)
6004 | Timer::Conn(ConnTimer::KeyDiscard)
6005 )
6006 })
6007 .min_by_key(|(_, time)| *time)
6008 .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6009 }
6010
6011 #[cfg(test)]
6013 pub(crate) fn using_ecn(&self) -> bool {
6014 self.path_data(PathId::ZERO).sending_ecn
6015 }
6016
6017 #[cfg(test)]
6019 pub(crate) fn total_recvd(&self) -> u64 {
6020 self.path_data(PathId::ZERO).total_recvd
6021 }
6022
6023 #[cfg(test)]
6024 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6025 self.local_cid_state
6026 .get(&PathId::ZERO)
6027 .unwrap()
6028 .active_seq()
6029 }
6030
6031 #[cfg(test)]
6032 #[track_caller]
6033 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6034 self.local_cid_state
6035 .get(&PathId(path_id))
6036 .unwrap()
6037 .active_seq()
6038 }
6039
6040 #[cfg(test)]
6043 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6044 let n = self
6045 .local_cid_state
6046 .get_mut(&PathId::ZERO)
6047 .unwrap()
6048 .assign_retire_seq(v);
6049 self.endpoint_events
6050 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6051 }
6052
6053 #[cfg(test)]
6055 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
6056 self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
6057 }
6058
6059 #[cfg(test)]
6061 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6062 self.path_data(path_id).current_mtu()
6063 }
6064
6065 #[cfg(test)]
6067 pub(crate) fn trigger_path_validation(&mut self) {
6068 for path in self.paths.values_mut() {
6069 path.data.send_new_challenge = true;
6070 }
6071 }
6072
6073 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6084 let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
6085 path.data.send_new_challenge
6086 || path
6087 .prev
6088 .as_ref()
6089 .is_some_and(|(_, path)| path.send_new_challenge)
6090 || !path.data.path_responses.is_empty()
6091 });
6092 let other = self.streams.can_send_stream_data()
6093 || self
6094 .datagrams
6095 .outgoing
6096 .front()
6097 .is_some_and(|x| x.size(true) <= max_size);
6098 SendableFrames {
6099 acks: false,
6100 other,
6101 close: false,
6102 path_exclusive,
6103 }
6104 }
6105
6106 fn kill(&mut self, reason: ConnectionError) {
6108 self.close_common();
6109 self.state.move_to_drained(Some(reason));
6110 self.endpoint_events.push_back(EndpointEventInner::Drained);
6111 }
6112
6113 pub fn current_mtu(&self) -> u16 {
6120 self.paths
6121 .iter()
6122 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6123 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6124 .min()
6125 .expect("There is always at least one available path")
6126 }
6127
6128 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6135 let pn_len = PacketNumber::new(
6136 pn,
6137 self.spaces[SpaceId::Data]
6138 .for_path(path)
6139 .largest_acked_packet
6140 .unwrap_or(0),
6141 )
6142 .len();
6143
6144 1 + self
6146 .rem_cids
6147 .get(&path)
6148 .map(|cids| cids.active().len())
6149 .unwrap_or(20) + pn_len
6151 + self.tag_len_1rtt()
6152 }
6153
6154 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6155 let pn_len = 4;
6156
6157 let cid_len = self
6158 .rem_cids
6159 .values()
6160 .map(|cids| cids.active().len())
6161 .max()
6162 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
6166 }
6167
6168 fn tag_len_1rtt(&self) -> usize {
6169 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6170 Some(crypto) => Some(&*crypto.packet.local),
6171 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6172 };
6173 key.map_or(16, |x| x.tag_len())
6177 }
6178
6179 fn on_path_validated(&mut self, path_id: PathId) {
6181 self.path_data_mut(path_id).validated = true;
6182 let ConnectionSide::Server { server_config } = &self.side else {
6183 return;
6184 };
6185 let remote_addr = self.path_data(path_id).remote;
6186 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6187 new_tokens.clear();
6188 for _ in 0..server_config.validation_token.sent {
6189 new_tokens.push(remote_addr);
6190 }
6191 }
6192
6193 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6195 if let Some(path) = self.paths.get_mut(&path_id) {
6196 path.data.status.remote_update(status, status_seq_no);
6197 } else {
6198 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6199 }
6200 self.events.push_back(
6201 PathEvent::RemoteStatus {
6202 id: path_id,
6203 status,
6204 }
6205 .into(),
6206 );
6207 }
6208
6209 fn max_path_id(&self) -> Option<PathId> {
6218 if self.is_multipath_negotiated() {
6219 Some(self.remote_max_path_id.min(self.local_max_path_id))
6220 } else {
6221 None
6222 }
6223 }
6224
6225 pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6227 if let Some(added) = self.iroh_hp.add_local_address(address)? {
6228 self.spaces[SpaceId::Data].pending.add_address.insert(added);
6229 };
6230 Ok(())
6231 }
6232
6233 pub fn remove_nat_traversal_address(
6237 &mut self,
6238 address: SocketAddr,
6239 ) -> Result<(), iroh_hp::Error> {
6240 if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6241 self.spaces[SpaceId::Data]
6242 .pending
6243 .remove_address
6244 .insert(removed);
6245 }
6246 Ok(())
6247 }
6248
6249 pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6251 self.iroh_hp.get_local_nat_traversal_addresses()
6252 }
6253
6254 pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6256 Ok(self
6257 .iroh_hp
6258 .client_side()?
6259 .get_remote_nat_traversal_addresses())
6260 }
6261
6262 fn open_nat_traversal_path(
6270 &mut self,
6271 now: Instant,
6272 (ip, port): (IpAddr, u16),
6273 ipv6: bool,
6274 ) -> Result<Option<(PathId, SocketAddr, bool)>, PathError> {
6275 let remote = match ip {
6277 IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6278 IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6279 IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6280 IpAddr::V6(_) => {
6281 trace!("not using IPv6 nat candidate for IPv4 socket");
6282 return Ok(None);
6283 }
6284 };
6285 match self.open_path_ensure(remote, PathStatus::Backup, now) {
6286 Ok((path_id, path_was_known)) => {
6287 if path_was_known {
6288 trace!(%path_id, %remote, "nat traversal: path existed for remote");
6289 }
6290 Ok(Some((path_id, remote, path_was_known)))
6291 }
6292 Err(e) => {
6293 debug!(%remote, %e, "nat traversal: failed to probe remote");
6294 Err(e)
6295 }
6296 }
6297 }
6298
6299 pub fn initiate_nat_traversal_round(
6309 &mut self,
6310 now: Instant,
6311 ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6312 if self.state.is_closed() {
6313 return Err(iroh_hp::Error::Closed);
6314 }
6315
6316 let client_state = self.iroh_hp.client_side_mut()?;
6317 let iroh_hp::NatTraversalRound {
6318 new_round,
6319 reach_out_at,
6320 addresses_to_probe,
6321 prev_round_path_ids,
6322 } = client_state.initiate_nat_traversal_round()?;
6323
6324 self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6325
6326 for path_id in prev_round_path_ids {
6327 let validated = self
6330 .path(path_id)
6331 .map(|path| path.validated)
6332 .unwrap_or(false);
6333
6334 if !validated {
6335 let _ = self.close_path(
6336 now,
6337 path_id,
6338 TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6339 );
6340 }
6341 }
6342
6343 let mut err = None;
6344
6345 let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6346 let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6347 let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6348
6349 for (id, address) in addresses_to_probe {
6350 match self.open_nat_traversal_path(now, address, ipv6) {
6351 Ok(None) => {}
6352 Ok(Some((path_id, remote, path_was_known))) => {
6353 if !path_was_known {
6354 path_ids.push(path_id);
6355 probed_addresses.push(remote);
6356 }
6357 }
6358 Err(e) => {
6359 self.iroh_hp
6360 .client_side_mut()
6361 .expect("validated")
6362 .report_in_continuation(id, e);
6363 err.get_or_insert(e);
6364 }
6365 }
6366 }
6367
6368 if let Some(err) = err {
6369 if probed_addresses.is_empty() {
6371 return Err(iroh_hp::Error::Multipath(err));
6372 }
6373 }
6374
6375 self.iroh_hp
6376 .client_side_mut()
6377 .expect("connection side validated")
6378 .set_round_path_ids(path_ids);
6379
6380 Ok(probed_addresses)
6381 }
6382
6383 fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6388 let client_state = self.iroh_hp.client_side_mut().ok()?;
6389 let (id, address) = client_state.continue_nat_traversal_round()?;
6390 let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6391 let open_result = self.open_nat_traversal_path(now, address, ipv6);
6392 let client_state = self.iroh_hp.client_side_mut().expect("validated");
6393 match open_result {
6394 Ok(None) => Some(true),
6395 Ok(Some((path_id, _remote, path_was_known))) => {
6396 if !path_was_known {
6397 client_state.add_round_path_id(path_id);
6398 }
6399 Some(true)
6400 }
6401 Err(e) => {
6402 client_state.report_in_continuation(id, e);
6403 Some(false)
6404 }
6405 }
6406 }
6407}
6408
6409impl fmt::Debug for Connection {
6410 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6411 f.debug_struct("Connection")
6412 .field("handshake_cid", &self.handshake_cid)
6413 .finish()
6414 }
6415}
6416
6417#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6418enum PathBlocked {
6419 No,
6420 AntiAmplification,
6421 Congestion,
6422 Pacing,
6423}
6424
6425enum ConnectionSide {
6427 Client {
6428 token: Bytes,
6430 token_store: Arc<dyn TokenStore>,
6431 server_name: String,
6432 },
6433 Server {
6434 server_config: Arc<ServerConfig>,
6435 },
6436}
6437
6438impl ConnectionSide {
6439 fn remote_may_migrate(&self, state: &State) -> bool {
6440 match self {
6441 Self::Server { server_config } => server_config.migration,
6442 Self::Client { .. } => {
6443 if let Some(hs) = state.as_handshake() {
6444 hs.allow_server_migration
6445 } else {
6446 false
6447 }
6448 }
6449 }
6450 }
6451
6452 fn is_client(&self) -> bool {
6453 self.side().is_client()
6454 }
6455
6456 fn is_server(&self) -> bool {
6457 self.side().is_server()
6458 }
6459
6460 fn side(&self) -> Side {
6461 match *self {
6462 Self::Client { .. } => Side::Client,
6463 Self::Server { .. } => Side::Server,
6464 }
6465 }
6466}
6467
6468impl From<SideArgs> for ConnectionSide {
6469 fn from(side: SideArgs) -> Self {
6470 match side {
6471 SideArgs::Client {
6472 token_store,
6473 server_name,
6474 } => Self::Client {
6475 token: token_store.take(&server_name).unwrap_or_default(),
6476 token_store,
6477 server_name,
6478 },
6479 SideArgs::Server {
6480 server_config,
6481 pref_addr_cid: _,
6482 path_validated: _,
6483 } => Self::Server { server_config },
6484 }
6485 }
6486}
6487
6488pub(crate) enum SideArgs {
6490 Client {
6491 token_store: Arc<dyn TokenStore>,
6492 server_name: String,
6493 },
6494 Server {
6495 server_config: Arc<ServerConfig>,
6496 pref_addr_cid: Option<ConnectionId>,
6497 path_validated: bool,
6498 },
6499}
6500
6501impl SideArgs {
6502 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6503 match *self {
6504 Self::Client { .. } => None,
6505 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6506 }
6507 }
6508
6509 pub(crate) fn path_validated(&self) -> bool {
6510 match *self {
6511 Self::Client { .. } => true,
6512 Self::Server { path_validated, .. } => path_validated,
6513 }
6514 }
6515
6516 pub(crate) fn side(&self) -> Side {
6517 match *self {
6518 Self::Client { .. } => Side::Client,
6519 Self::Server { .. } => Side::Server,
6520 }
6521 }
6522}
6523
6524#[derive(Debug, Error, Clone, PartialEq, Eq)]
6526pub enum ConnectionError {
6527 #[error("peer doesn't implement any supported version")]
6529 VersionMismatch,
6530 #[error(transparent)]
6532 TransportError(#[from] TransportError),
6533 #[error("aborted by peer: {0}")]
6535 ConnectionClosed(frame::ConnectionClose),
6536 #[error("closed by peer: {0}")]
6538 ApplicationClosed(frame::ApplicationClose),
6539 #[error("reset by peer")]
6541 Reset,
6542 #[error("timed out")]
6548 TimedOut,
6549 #[error("closed")]
6551 LocallyClosed,
6552 #[error("CIDs exhausted")]
6556 CidsExhausted,
6557}
6558
6559impl From<Close> for ConnectionError {
6560 fn from(x: Close) -> Self {
6561 match x {
6562 Close::Connection(reason) => Self::ConnectionClosed(reason),
6563 Close::Application(reason) => Self::ApplicationClosed(reason),
6564 }
6565 }
6566}
6567
6568impl From<ConnectionError> for io::Error {
6570 fn from(x: ConnectionError) -> Self {
6571 use ConnectionError::*;
6572 let kind = match x {
6573 TimedOut => io::ErrorKind::TimedOut,
6574 Reset => io::ErrorKind::ConnectionReset,
6575 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6576 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6577 io::ErrorKind::Other
6578 }
6579 };
6580 Self::new(kind, x)
6581 }
6582}
6583
6584#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6587pub enum PathError {
6588 #[error("multipath extension not negotiated")]
6590 MultipathNotNegotiated,
6591 #[error("the server side may not open a path")]
6593 ServerSideNotAllowed,
6594 #[error("maximum number of concurrent paths reached")]
6596 MaxPathIdReached,
6597 #[error("remoted CIDs exhausted")]
6599 RemoteCidsExhausted,
6600 #[error("path validation failed")]
6602 ValidationFailed,
6603 #[error("invalid remote address")]
6605 InvalidRemoteAddress(SocketAddr),
6606}
6607
6608#[derive(Debug, Error, Clone, Eq, PartialEq)]
6610pub enum ClosePathError {
6611 #[error("closed path")]
6613 ClosedPath,
6614 #[error("last open path")]
6616 LastOpenPath,
6617}
6618
6619#[derive(Debug, Error, Clone, Copy)]
6620#[error("Multipath extension not negotiated")]
6621pub struct MultipathNotNegotiated {
6622 _private: (),
6623}
6624
6625#[derive(Debug)]
6627pub enum Event {
6628 HandshakeDataReady,
6630 Connected,
6632 HandshakeConfirmed,
6634 ConnectionLost {
6638 reason: ConnectionError,
6640 },
6641 Stream(StreamEvent),
6643 DatagramReceived,
6645 DatagramsUnblocked,
6647 Path(PathEvent),
6649 NatTraversal(iroh_hp::Event),
6651}
6652
6653impl From<PathEvent> for Event {
6654 fn from(source: PathEvent) -> Self {
6655 Self::Path(source)
6656 }
6657}
6658
6659fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6660 Duration::from_micros(params.max_ack_delay.0 * 1000)
6661}
6662
6663const MAX_BACKOFF_EXPONENT: u32 = 16;
6665
6666const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6674
6675const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6681 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6682
6683const KEY_UPDATE_MARGIN: u64 = 10_000;
6687
6688#[derive(Default)]
6689struct SentFrames {
6690 retransmits: ThinRetransmits,
6691 largest_acked: FxHashMap<PathId, u64>,
6693 stream_frames: StreamMetaVec,
6694 non_retransmits: bool,
6696 requires_padding: bool,
6698}
6699
6700impl SentFrames {
6701 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6703 !self.largest_acked.is_empty()
6704 && !self.non_retransmits
6705 && self.stream_frames.is_empty()
6706 && self.retransmits.is_empty(streams)
6707 }
6708}
6709
6710fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6718 match (x, y) {
6719 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6720 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6721 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6722 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6723 }
6724}
6725
6726#[cfg(test)]
6727mod tests {
6728 use super::*;
6729
6730 #[test]
6731 fn negotiate_max_idle_timeout_commutative() {
6732 let test_params = [
6733 (None, None, None),
6734 (None, Some(VarInt(0)), None),
6735 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6736 (Some(VarInt(0)), Some(VarInt(0)), None),
6737 (
6738 Some(VarInt(2)),
6739 Some(VarInt(0)),
6740 Some(Duration::from_millis(2)),
6741 ),
6742 (
6743 Some(VarInt(1)),
6744 Some(VarInt(4)),
6745 Some(Duration::from_millis(1)),
6746 ),
6747 ];
6748
6749 for (left, right, result) in test_params {
6750 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6751 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6752 }
6753 }
6754}