1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 num::{NonZeroU32, NonZeroUsize},
8 ops::Not,
9 sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
22 MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
23 TransportErrorCode, VarInt,
24 cid_generator::ConnectionIdGenerator,
25 cid_queue::CidQueue,
26 coding::BufMutExt,
27 config::{ServerConfig, TransportConfig},
28 congestion::Controller,
29 connection::{
30 paths::AbandonState,
31 qlog::{QlogRecvPacket, QlogSentPacket, QlogSink},
32 spaces::LostPacket,
33 timer::{ConnTimer, PathTimer},
34 },
35 crypto::{self, KeyPair, Keys, PacketKey},
36 frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
37 iroh_hp,
38 packet::{
39 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
40 PacketNumber, PartialDecode, SpaceId,
41 },
42 range_set::ArrayRangeSet,
43 shared::{
44 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
45 EndpointEvent, EndpointEventInner,
46 },
47 token::{ResetToken, Token, TokenPayload},
48 transport_parameters::TransportParameters,
49};
50
51mod ack_frequency;
52use ack_frequency::AckFrequencyState;
53
54mod assembler;
55pub use assembler::Chunk;
56
57mod cid_state;
58use cid_state::CidState;
59
60mod datagrams;
61use datagrams::DatagramState;
62pub use datagrams::{Datagrams, SendDatagramError};
63
64mod mtud;
65mod pacing;
66
67mod packet_builder;
68use packet_builder::{PacketBuilder, PadDatagram};
69
70mod packet_crypto;
71use packet_crypto::{PrevCrypto, ZeroRttCrypto};
72
73mod paths;
74pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
75use paths::{PathData, PathState};
76
77pub(crate) mod qlog;
78
79mod send_buffer;
80
81mod spaces;
82#[cfg(fuzzing)]
83pub use spaces::Retransmits;
84#[cfg(not(fuzzing))]
85use spaces::Retransmits;
86use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
87
88mod stats;
89pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
90
91mod streams;
92#[cfg(fuzzing)]
93pub use streams::StreamsState;
94#[cfg(not(fuzzing))]
95use streams::StreamsState;
96pub use streams::{
97 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
98 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
99};
100
101mod timer;
102use timer::{Timer, TimerTable};
103
104mod transmit_buf;
105use transmit_buf::TransmitBuf;
106
107mod state;
108
109#[cfg(not(fuzzing))]
110use state::State;
111#[cfg(fuzzing)]
112pub use state::State;
113use state::StateType;
114
115pub struct Connection {
155 endpoint_config: Arc<EndpointConfig>,
156 config: Arc<TransportConfig>,
157 rng: StdRng,
158 crypto: Box<dyn crypto::Session>,
159 handshake_cid: ConnectionId,
161 rem_handshake_cid: ConnectionId,
163 local_ip: Option<IpAddr>,
166 paths: BTreeMap<PathId, PathState>,
172 path_counter: u64,
176 allow_mtud: bool,
178 state: State,
179 side: ConnectionSide,
180 zero_rtt_enabled: bool,
182 zero_rtt_crypto: Option<ZeroRttCrypto>,
184 key_phase: bool,
185 key_phase_size: u64,
187 peer_params: TransportParameters,
189 orig_rem_cid: ConnectionId,
191 initial_dst_cid: ConnectionId,
193 retry_src_cid: Option<ConnectionId>,
196 events: VecDeque<Event>,
198 endpoint_events: VecDeque<EndpointEventInner>,
199 spin_enabled: bool,
201 spin: bool,
203 spaces: [PacketSpace; 3],
205 highest_space: SpaceId,
207 prev_crypto: Option<PrevCrypto>,
209 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
214 accepted_0rtt: bool,
215 permit_idle_reset: bool,
217 idle_timeout: Option<Duration>,
219 timers: TimerTable,
220 authentication_failures: u64,
222
223 close: bool,
228
229 ack_frequency: AckFrequencyState,
233
234 receiving_ecn: bool,
239 total_authed_packets: u64,
241 app_limited: bool,
244
245 next_observed_addr_seq_no: VarInt,
250
251 streams: StreamsState,
252 rem_cids: FxHashMap<PathId, CidQueue>,
258 local_cid_state: FxHashMap<PathId, CidState>,
265 datagrams: DatagramState,
267 stats: ConnectionStats,
269 path_stats: FxHashMap<PathId, PathStats>,
271 version: u32,
273
274 max_concurrent_paths: NonZeroU32,
283 local_max_path_id: PathId,
298 remote_max_path_id: PathId,
304 max_path_id_with_cids: PathId,
310 abandoned_paths: FxHashSet<PathId>,
318
319 iroh_hp: iroh_hp::State,
320 qlog: QlogSink,
321}
322
323impl Connection {
324 pub(crate) fn new(
325 endpoint_config: Arc<EndpointConfig>,
326 config: Arc<TransportConfig>,
327 init_cid: ConnectionId,
328 loc_cid: ConnectionId,
329 rem_cid: ConnectionId,
330 remote: SocketAddr,
331 local_ip: Option<IpAddr>,
332 crypto: Box<dyn crypto::Session>,
333 cid_gen: &dyn ConnectionIdGenerator,
334 now: Instant,
335 version: u32,
336 allow_mtud: bool,
337 rng_seed: [u8; 32],
338 side_args: SideArgs,
339 qlog: QlogSink,
340 ) -> Self {
341 let pref_addr_cid = side_args.pref_addr_cid();
342 let path_validated = side_args.path_validated();
343 let connection_side = ConnectionSide::from(side_args);
344 let side = connection_side.side();
345 let mut rng = StdRng::from_seed(rng_seed);
346 let initial_space = {
347 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
348 space.crypto = Some(crypto.initial_keys(init_cid, side));
349 space
350 };
351 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
352 #[cfg(test)]
353 let data_space = match config.deterministic_packet_numbers {
354 true => PacketSpace::new_deterministic(now, SpaceId::Data),
355 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
356 };
357 #[cfg(not(test))]
358 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
359 let state = State::handshake(state::Handshake {
360 rem_cid_set: side.is_server(),
361 expected_token: Bytes::new(),
362 client_hello: None,
363 allow_server_migration: side.is_client(),
364 });
365 let local_cid_state = FxHashMap::from_iter([(
366 PathId::ZERO,
367 CidState::new(
368 cid_gen.cid_len(),
369 cid_gen.cid_lifetime(),
370 now,
371 if pref_addr_cid.is_some() { 2 } else { 1 },
372 ),
373 )]);
374
375 let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config);
376 path.open = true;
378 let mut this = Self {
379 endpoint_config,
380 crypto,
381 handshake_cid: loc_cid,
382 rem_handshake_cid: rem_cid,
383 local_cid_state,
384 paths: BTreeMap::from_iter([(
385 PathId::ZERO,
386 PathState {
387 data: path,
388 prev: None,
389 },
390 )]),
391 path_counter: 0,
392 allow_mtud,
393 local_ip,
394 state,
395 side: connection_side,
396 zero_rtt_enabled: false,
397 zero_rtt_crypto: None,
398 key_phase: false,
399 key_phase_size: rng.random_range(10..1000),
406 peer_params: TransportParameters::default(),
407 orig_rem_cid: rem_cid,
408 initial_dst_cid: init_cid,
409 retry_src_cid: None,
410 events: VecDeque::new(),
411 endpoint_events: VecDeque::new(),
412 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
413 spin: false,
414 spaces: [initial_space, handshake_space, data_space],
415 highest_space: SpaceId::Initial,
416 prev_crypto: None,
417 next_crypto: None,
418 accepted_0rtt: false,
419 permit_idle_reset: true,
420 idle_timeout: match config.max_idle_timeout {
421 None | Some(VarInt(0)) => None,
422 Some(dur) => Some(Duration::from_millis(dur.0)),
423 },
424 timers: TimerTable::default(),
425 authentication_failures: 0,
426 close: false,
427
428 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
429 &TransportParameters::default(),
430 )),
431
432 app_limited: false,
433 receiving_ecn: false,
434 total_authed_packets: 0,
435
436 next_observed_addr_seq_no: 0u32.into(),
437
438 streams: StreamsState::new(
439 side,
440 config.max_concurrent_uni_streams,
441 config.max_concurrent_bidi_streams,
442 config.send_window,
443 config.receive_window,
444 config.stream_receive_window,
445 ),
446 datagrams: DatagramState::default(),
447 config,
448 rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
449 rng,
450 stats: ConnectionStats::default(),
451 path_stats: Default::default(),
452 version,
453
454 max_concurrent_paths: NonZeroU32::MIN,
456 local_max_path_id: PathId::ZERO,
457 remote_max_path_id: PathId::ZERO,
458 max_path_id_with_cids: PathId::ZERO,
459 abandoned_paths: Default::default(),
460
461 iroh_hp: Default::default(),
463 qlog,
464 };
465 if path_validated {
466 this.on_path_validated(PathId::ZERO);
467 }
468 if side.is_client() {
469 this.write_crypto();
471 this.init_0rtt(now);
472 }
473 this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now);
474 this
475 }
476
477 #[must_use]
485 pub fn poll_timeout(&mut self) -> Option<Instant> {
486 self.timers.peek()
487 }
488
489 #[must_use]
495 pub fn poll(&mut self) -> Option<Event> {
496 if let Some(x) = self.events.pop_front() {
497 return Some(x);
498 }
499
500 if let Some(event) = self.streams.poll() {
501 return Some(Event::Stream(event));
502 }
503
504 if let Some(reason) = self.state.take_error() {
505 return Some(Event::ConnectionLost { reason });
506 }
507
508 None
509 }
510
511 #[must_use]
513 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
514 self.endpoint_events.pop_front().map(EndpointEvent)
515 }
516
517 #[must_use]
519 pub fn streams(&mut self) -> Streams<'_> {
520 Streams {
521 state: &mut self.streams,
522 conn_state: &self.state,
523 }
524 }
525
526 #[must_use]
528 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
529 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
530 RecvStream {
531 id,
532 state: &mut self.streams,
533 pending: &mut self.spaces[SpaceId::Data].pending,
534 }
535 }
536
537 #[must_use]
539 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
540 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
541 SendStream {
542 id,
543 state: &mut self.streams,
544 pending: &mut self.spaces[SpaceId::Data].pending,
545 conn_state: &self.state,
546 }
547 }
548
549 pub fn open_path_ensure(
556 &mut self,
557 remote: SocketAddr,
558 initial_status: PathStatus,
559 now: Instant,
560 ) -> Result<(PathId, bool), PathError> {
561 match self
562 .paths
563 .iter()
564 .find(|(_id, path)| path.data.remote == remote)
565 {
566 Some((path_id, _state)) => Ok((*path_id, true)),
567 None => self
568 .open_path(remote, initial_status, now)
569 .map(|id| (id, false)),
570 }
571 }
572
573 pub fn open_path(
578 &mut self,
579 remote: SocketAddr,
580 initial_status: PathStatus,
581 now: Instant,
582 ) -> Result<PathId, PathError> {
583 if !self.is_multipath_negotiated() {
584 return Err(PathError::MultipathNotNegotiated);
585 }
586 if self.side().is_server() {
587 return Err(PathError::ServerSideNotAllowed);
588 }
589
590 let max_abandoned = self.abandoned_paths.iter().max().copied();
591 let max_used = self.paths.keys().last().copied();
592 let path_id = max_abandoned
593 .max(max_used)
594 .unwrap_or(PathId::ZERO)
595 .saturating_add(1u8);
596
597 if Some(path_id) > self.max_path_id() {
598 return Err(PathError::MaxPathIdReached);
599 }
600 if path_id > self.remote_max_path_id {
601 self.spaces[SpaceId::Data].pending.paths_blocked = true;
602 return Err(PathError::MaxPathIdReached);
603 }
604 if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
605 self.spaces[SpaceId::Data]
606 .pending
607 .path_cids_blocked
608 .push(path_id);
609 return Err(PathError::RemoteCidsExhausted);
610 }
611
612 let path = self.ensure_path(path_id, remote, now, None);
613 path.status.local_update(initial_status);
614
615 Ok(path_id)
616 }
617
618 pub fn close_path(
624 &mut self,
625 now: Instant,
626 path_id: PathId,
627 error_code: VarInt,
628 ) -> Result<(), ClosePathError> {
629 if self.abandoned_paths.contains(&path_id)
630 || Some(path_id) > self.max_path_id()
631 || !self.paths.contains_key(&path_id)
632 {
633 return Err(ClosePathError::ClosedPath);
634 }
635 if self
636 .paths
637 .iter()
638 .any(|(id, path)| {
640 *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
641 })
642 .not()
643 {
644 return Err(ClosePathError::LastOpenPath);
645 }
646
647 self.spaces[SpaceId::Data]
649 .pending
650 .path_abandon
651 .insert(path_id, error_code.into());
652
653 let pending_space = &mut self.spaces[SpaceId::Data].pending;
655 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
656 pending_space.path_cids_blocked.retain(|&id| id != path_id);
657 pending_space.path_status.retain(|&id| id != path_id);
658
659 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
661 for sent_packet in space.sent_packets.values_mut() {
662 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
663 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
664 retransmits.path_cids_blocked.retain(|&id| id != path_id);
665 retransmits.path_status.retain(|&id| id != path_id);
666 }
667 }
668 }
669
670 self.rem_cids.remove(&path_id);
676 self.endpoint_events
677 .push_back(EndpointEventInner::RetireResetToken(path_id));
678
679 self.abandoned_paths.insert(path_id);
680
681 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
682
683 Ok(())
684 }
685
686 #[track_caller]
690 fn path_data(&self, path_id: PathId) -> &PathData {
691 if let Some(data) = self.paths.get(&path_id) {
692 &data.data
693 } else {
694 panic!(
695 "unknown path: {path_id}, currently known paths: {:?}",
696 self.paths.keys().collect::<Vec<_>>()
697 );
698 }
699 }
700
701 fn path(&self, path_id: PathId) -> Option<&PathData> {
703 self.paths.get(&path_id).map(|path_state| &path_state.data)
704 }
705
706 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
708 self.paths
709 .get_mut(&path_id)
710 .map(|path_state| &mut path_state.data)
711 }
712
713 pub fn paths(&self) -> Vec<PathId> {
717 self.paths.keys().copied().collect()
718 }
719
720 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
722 self.path(path_id)
723 .map(PathData::local_status)
724 .ok_or(ClosedPath { _private: () })
725 }
726
727 pub fn path_remote_address(&self, path_id: PathId) -> Result<SocketAddr, ClosedPath> {
729 self.path(path_id)
730 .map(|path| path.remote)
731 .ok_or(ClosedPath { _private: () })
732 }
733
734 pub fn set_path_status(
738 &mut self,
739 path_id: PathId,
740 status: PathStatus,
741 ) -> Result<PathStatus, SetPathStatusError> {
742 if !self.is_multipath_negotiated() {
743 return Err(SetPathStatusError::MultipathNotNegotiated);
744 }
745 let path = self
746 .path_mut(path_id)
747 .ok_or(SetPathStatusError::ClosedPath)?;
748 let prev = match path.status.local_update(status) {
749 Some(prev) => {
750 self.spaces[SpaceId::Data]
751 .pending
752 .path_status
753 .insert(path_id);
754 prev
755 }
756 None => path.local_status(),
757 };
758 Ok(prev)
759 }
760
761 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
766 self.path(path_id).and_then(|path| path.remote_status())
767 }
768
769 pub fn set_path_max_idle_timeout(
775 &mut self,
776 path_id: PathId,
777 timeout: Option<Duration>,
778 ) -> Result<Option<Duration>, ClosedPath> {
779 let path = self
780 .paths
781 .get_mut(&path_id)
782 .ok_or(ClosedPath { _private: () })?;
783 Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
784 }
785
786 pub fn set_path_keep_alive_interval(
792 &mut self,
793 path_id: PathId,
794 interval: Option<Duration>,
795 ) -> Result<Option<Duration>, ClosedPath> {
796 let path = self
797 .paths
798 .get_mut(&path_id)
799 .ok_or(ClosedPath { _private: () })?;
800 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
801 }
802
803 #[track_caller]
807 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
808 &mut self.paths.get_mut(&path_id).expect("known path").data
809 }
810
811 fn ensure_path(
812 &mut self,
813 path_id: PathId,
814 remote: SocketAddr,
815 now: Instant,
816 pn: Option<u64>,
817 ) -> &mut PathData {
818 let vacant_entry = match self.paths.entry(path_id) {
819 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
820 btree_map::Entry::Occupied(occupied_entry) => {
821 return &mut occupied_entry.into_mut().data;
822 }
823 };
824
825 debug!(%path_id, ?remote, "path added");
828 let peer_max_udp_payload_size =
829 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
830 self.path_counter = self.path_counter.wrapping_add(1);
831 let mut data = PathData::new(
832 remote,
833 self.allow_mtud,
834 Some(peer_max_udp_payload_size),
835 self.path_counter,
836 now,
837 &self.config,
838 );
839
840 let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
841 self.timers.set(
842 Timer::PerPath(path_id, PathTimer::PathOpen),
843 now + 3 * pto,
844 self.qlog.with_time(now),
845 );
846
847 data.send_new_challenge = true;
850
851 let path = vacant_entry.insert(PathState { data, prev: None });
852
853 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
854 if let Some(pn) = pn {
855 pn_space.dedup.insert(pn);
856 }
857 self.spaces[SpaceId::Data]
858 .number_spaces
859 .insert(path_id, pn_space);
860 self.qlog.emit_tuple_assigned(path_id, remote, now);
861 &mut path.data
862 }
863
864 #[must_use]
874 pub fn poll_transmit(
875 &mut self,
876 now: Instant,
877 max_datagrams: NonZeroUsize,
878 buf: &mut Vec<u8>,
879 ) -> Option<Transmit> {
880 if let Some(probing) = self
881 .iroh_hp
882 .server_side_mut()
883 .ok()
884 .and_then(iroh_hp::ServerState::next_probe)
885 {
886 let destination = probing.remote();
887 trace!(%destination, "RAND_DATA packet");
888 let token: u64 = self.rng.random();
889 buf.put_u64(token);
890 probing.finish(token);
891 return Some(Transmit {
892 destination,
893 ecn: None,
894 size: 8,
895 segment_size: None,
896 src_ip: None,
897 });
898 }
899
900 let max_datagrams = match self.config.enable_segmentation_offload {
901 false => NonZeroUsize::MIN,
902 true => max_datagrams,
903 };
904
905 let close = match self.state.as_type() {
924 StateType::Drained => {
925 self.app_limited = true;
926 return None;
927 }
928 StateType::Draining | StateType::Closed => {
929 if !self.close {
932 self.app_limited = true;
933 return None;
934 }
935 true
936 }
937 _ => false,
938 };
939
940 if let Some(config) = &self.config.ack_frequency_config {
942 let rtt = self
943 .paths
944 .values()
945 .map(|p| p.data.rtt.get())
946 .min()
947 .expect("one path exists");
948 self.spaces[SpaceId::Data].pending.ack_frequency = self
949 .ack_frequency
950 .should_send_ack_frequency(rtt, config, &self.peer_params)
951 && self.highest_space == SpaceId::Data
952 && self.peer_supports_ack_frequency();
953 }
954
955 let mut coalesce = true;
957
958 let mut pad_datagram = PadDatagram::No;
961
962 let mut congestion_blocked = false;
966
967 let mut last_packet_number = None;
969
970 let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
971
972 let have_available_path = self.paths.iter().any(|(id, path)| {
975 path.data.validated
976 && path.data.local_status() == PathStatus::Available
977 && self.rem_cids.contains_key(id)
978 });
979
980 let mut transmit = TransmitBuf::new(
982 buf,
983 max_datagrams,
984 self.path_data(path_id).current_mtu().into(),
985 );
986 if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
987 return Some(challenge);
988 }
989 let mut space_id = match path_id {
990 PathId::ZERO => SpaceId::Initial,
991 _ => SpaceId::Data,
992 };
993
994 loop {
995 let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
997 let err = PathError::RemoteCidsExhausted;
998 if !self.abandoned_paths.contains(&path_id) {
999 debug!(?err, %path_id, "no active CID for path");
1000 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1001 id: path_id,
1002 error: err,
1003 }));
1004 self.close_path(
1008 now,
1009 path_id,
1010 TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1011 )
1012 .ok();
1013 self.spaces[SpaceId::Data]
1014 .pending
1015 .path_cids_blocked
1016 .push(path_id);
1017 } else {
1018 trace!(%path_id, "remote CIDs retired for abandoned path");
1019 }
1020
1021 match self.paths.keys().find(|&&next| next > path_id) {
1022 Some(next_path_id) => {
1023 path_id = *next_path_id;
1025 space_id = SpaceId::Data;
1026
1027 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1029 if let Some(challenge) =
1030 self.send_prev_path_challenge(now, &mut transmit, path_id)
1031 {
1032 return Some(challenge);
1033 }
1034
1035 continue;
1036 }
1037 None => {
1038 trace!(
1040 ?space_id,
1041 %path_id,
1042 "no CIDs to send on path, no more paths"
1043 );
1044 break;
1045 }
1046 }
1047 };
1048
1049 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1052 transmit.datagram_remaining_mut()
1054 } else {
1055 transmit.segment_size()
1057 };
1058 let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1059 let path_should_send = {
1060 let path_exclusive_only = space_id == SpaceId::Data
1061 && have_available_path
1062 && self.path_data(path_id).local_status() == PathStatus::Backup;
1063 let path_should_send = if path_exclusive_only {
1064 can_send.path_exclusive
1065 } else {
1066 !can_send.is_empty()
1067 };
1068 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1069 path_should_send || needs_loss_probe
1070 };
1071
1072 if !path_should_send && space_id < SpaceId::Data {
1073 if self.spaces[space_id].crypto.is_some() {
1074 trace!(?space_id, %path_id, "nothing to send in space");
1075 }
1076 space_id = space_id.next();
1077 continue;
1078 }
1079
1080 let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1081 self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1083 } else {
1084 PathBlocked::No
1085 };
1086 if send_blocked != PathBlocked::No {
1087 trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1088 congestion_blocked = true;
1089 }
1090 if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1091 space_id = space_id.next();
1094 continue;
1095 }
1096 if !path_should_send || send_blocked != PathBlocked::No {
1097 if transmit.num_datagrams() > 0 {
1102 break;
1103 }
1104
1105 match self.paths.keys().find(|&&next| next > path_id) {
1106 Some(next_path_id) => {
1107 trace!(
1109 ?space_id,
1110 %path_id,
1111 %next_path_id,
1112 "nothing to send on path"
1113 );
1114 path_id = *next_path_id;
1115 space_id = SpaceId::Data;
1116
1117 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1119 if let Some(challenge) =
1120 self.send_prev_path_challenge(now, &mut transmit, path_id)
1121 {
1122 return Some(challenge);
1123 }
1124
1125 continue;
1126 }
1127 None => {
1128 trace!(
1130 ?space_id,
1131 %path_id,
1132 next_path_id=?None::<PathId>,
1133 "nothing to send on path"
1134 );
1135 break;
1136 }
1137 }
1138 }
1139
1140 if transmit.datagram_remaining_mut() == 0 {
1142 if transmit.num_datagrams() >= transmit.max_datagrams().get() {
1143 break;
1145 }
1146
1147 match self.spaces[space_id].for_path(path_id).loss_probes {
1148 0 => transmit.start_new_datagram(),
1149 _ => {
1150 let request_immediate_ack =
1152 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1153 self.spaces[space_id].maybe_queue_probe(
1154 path_id,
1155 request_immediate_ack,
1156 &self.streams,
1157 );
1158
1159 self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1160
1161 transmit.start_new_datagram_with_size(std::cmp::min(
1165 usize::from(INITIAL_MTU),
1166 transmit.segment_size(),
1167 ));
1168 }
1169 }
1170 trace!(count = transmit.num_datagrams(), "new datagram started");
1171 coalesce = true;
1172 pad_datagram = PadDatagram::No;
1173 }
1174
1175 if transmit.datagram_start_offset() < transmit.len() {
1178 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1179 }
1180
1181 if self.spaces[SpaceId::Initial].crypto.is_some()
1186 && space_id == SpaceId::Handshake
1187 && self.side.is_client()
1188 {
1189 self.discard_space(now, SpaceId::Initial);
1192 }
1193 if let Some(ref mut prev) = self.prev_crypto {
1194 prev.update_unacked = false;
1195 }
1196
1197 let mut qlog = QlogSentPacket::default();
1198 let mut builder = PacketBuilder::new(
1199 now,
1200 space_id,
1201 path_id,
1202 remote_cid,
1203 &mut transmit,
1204 can_send.other,
1205 self,
1206 &mut qlog,
1207 )?;
1208 last_packet_number = Some(builder.exact_number);
1209 coalesce = coalesce && !builder.short_header;
1210
1211 if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1212 pad_datagram |= PadDatagram::ToMinMtu;
1214 }
1215 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1216 pad_datagram |= PadDatagram::ToSegmentSize;
1217 }
1218
1219 if can_send.close {
1220 trace!("sending CONNECTION_CLOSE");
1221 let mut sent_frames = SentFrames::default();
1226 let is_multipath_negotiated = self.is_multipath_negotiated();
1227 for path_id in self.spaces[space_id]
1228 .number_spaces
1229 .iter()
1230 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1231 .map(|(&path_id, _)| path_id)
1232 .collect::<Vec<_>>()
1233 {
1234 Self::populate_acks(
1235 now,
1236 self.receiving_ecn,
1237 &mut sent_frames,
1238 path_id,
1239 space_id,
1240 &mut self.spaces[space_id],
1241 is_multipath_negotiated,
1242 &mut builder.frame_space_mut(),
1243 &mut self.stats,
1244 &mut qlog,
1245 );
1246 }
1247
1248 debug_assert!(
1252 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1253 "ACKs should leave space for ConnectionClose"
1254 );
1255 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1256 let max_frame_size = builder.frame_space_remaining();
1257 match self.state.as_type() {
1258 StateType::Closed => {
1259 let reason: Close =
1260 self.state.as_closed().expect("checked").clone().into();
1261 if space_id == SpaceId::Data || reason.is_transport_layer() {
1262 reason.encode(&mut builder.frame_space_mut(), max_frame_size);
1263 qlog.frame(&Frame::Close(reason));
1264 } else {
1265 let frame = frame::ConnectionClose {
1266 error_code: TransportErrorCode::APPLICATION_ERROR,
1267 frame_type: None,
1268 reason: Bytes::new(),
1269 };
1270 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1271 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1272 }
1273 }
1274 StateType::Draining => {
1275 let frame = frame::ConnectionClose {
1276 error_code: TransportErrorCode::NO_ERROR,
1277 frame_type: None,
1278 reason: Bytes::new(),
1279 };
1280 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1281 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1282 }
1283 _ => unreachable!(
1284 "tried to make a close packet when the connection wasn't closed"
1285 ),
1286 };
1287 }
1288 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1289 if space_id == self.highest_space {
1290 self.close = false;
1293 break;
1295 } else {
1296 space_id = space_id.next();
1300 continue;
1301 }
1302 }
1303
1304 if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1307 let path = self.path_data_mut(path_id);
1308 if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) {
1309 let response = frame::PathResponse(token);
1313 trace!(%response, "(off-path)");
1314 builder.frame_space_mut().write(response);
1315 qlog.frame(&Frame::PathResponse(response));
1316 self.stats.frame_tx.path_response += 1;
1317 builder.finish_and_track(
1318 now,
1319 self,
1320 path_id,
1321 SentFrames {
1322 non_retransmits: true,
1323 ..SentFrames::default()
1324 },
1325 PadDatagram::ToMinMtu,
1326 qlog,
1327 );
1328 self.stats.udp_tx.on_sent(1, transmit.len());
1329 return Some(Transmit {
1330 destination: remote,
1331 size: transmit.len(),
1332 ecn: None,
1333 segment_size: None,
1334 src_ip: self.local_ip,
1335 });
1336 }
1337 }
1338
1339 let sent_frames = {
1340 let path_exclusive_only = have_available_path
1341 && self.path_data(path_id).local_status() == PathStatus::Backup;
1342 let pn = builder.exact_number;
1343 self.populate_packet(
1344 now,
1345 space_id,
1346 path_id,
1347 path_exclusive_only,
1348 &mut builder.frame_space_mut(),
1349 pn,
1350 &mut qlog,
1351 )
1352 };
1353
1354 debug_assert!(
1361 !(sent_frames.is_ack_only(&self.streams)
1362 && !can_send.acks
1363 && can_send.other
1364 && builder.buf.segment_size()
1365 == self.path_data(path_id).current_mtu() as usize
1366 && self.datagrams.outgoing.is_empty()),
1367 "SendableFrames was {can_send:?}, but only ACKs have been written"
1368 );
1369 if sent_frames.requires_padding {
1370 pad_datagram |= PadDatagram::ToMinMtu;
1371 }
1372
1373 for (path_id, _pn) in sent_frames.largest_acked.iter() {
1374 self.spaces[space_id]
1375 .for_path(*path_id)
1376 .pending_acks
1377 .acks_sent();
1378 self.timers.stop(
1379 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1380 self.qlog.with_time(now),
1381 );
1382 }
1383
1384 if coalesce
1392 && builder
1393 .buf
1394 .datagram_remaining_mut()
1395 .saturating_sub(builder.predict_packet_end())
1396 > MIN_PACKET_SPACE
1397 && self
1398 .next_send_space(space_id, path_id, builder.buf, close)
1399 .is_some()
1400 {
1401 builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog);
1404 } else {
1405 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1408 const MAX_PADDING: usize = 32;
1416 if builder.buf.datagram_remaining_mut()
1417 > builder.predict_packet_end() + MAX_PADDING
1418 {
1419 trace!(
1420 "GSO truncated by demand for {} padding bytes",
1421 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1422 );
1423 builder.finish_and_track(
1424 now,
1425 self,
1426 path_id,
1427 sent_frames,
1428 PadDatagram::No,
1429 qlog,
1430 );
1431 break;
1432 }
1433
1434 builder.finish_and_track(
1437 now,
1438 self,
1439 path_id,
1440 sent_frames,
1441 PadDatagram::ToSegmentSize,
1442 qlog,
1443 );
1444 } else {
1445 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1446 }
1447 if transmit.num_datagrams() == 1 {
1448 transmit.clip_datagram_size();
1449 }
1450 }
1451 }
1452
1453 if let Some(last_packet_number) = last_packet_number {
1454 self.path_data_mut(path_id).congestion.on_sent(
1457 now,
1458 transmit.len() as u64,
1459 last_packet_number,
1460 );
1461 }
1462
1463 self.qlog.emit_recovery_metrics(
1464 path_id,
1465 &mut self.paths.get_mut(&path_id).unwrap().data,
1466 now,
1467 );
1468
1469 self.app_limited = transmit.is_empty() && !congestion_blocked;
1470
1471 if transmit.is_empty() && self.state.is_established() {
1473 let space_id = SpaceId::Data;
1475 path_id = *self.paths.first_key_value().expect("one path must exist").0;
1476 let probe_data = loop {
1477 let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1483 let eligible = self.path_data(path_id).validated
1484 && !self.path_data(path_id).is_validating_path()
1485 && !self.abandoned_paths.contains(&path_id);
1486 let probe_size = eligible
1487 .then(|| {
1488 let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1489 self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1490 })
1491 .flatten();
1492 match (active_cid, probe_size) {
1493 (Some(active_cid), Some(probe_size)) => {
1494 break Some((active_cid, probe_size));
1496 }
1497 _ => {
1498 match self.paths.keys().find(|&&next| next > path_id) {
1500 Some(next) => {
1501 path_id = *next;
1502 continue;
1503 }
1504 None => break None,
1505 }
1506 }
1507 }
1508 };
1509 if let Some((active_cid, probe_size)) = probe_data {
1510 debug_assert_eq!(transmit.num_datagrams(), 0);
1512 transmit.start_new_datagram_with_size(probe_size as usize);
1513
1514 let mut qlog = QlogSentPacket::default();
1515 let mut builder = PacketBuilder::new(
1516 now,
1517 space_id,
1518 path_id,
1519 active_cid,
1520 &mut transmit,
1521 true,
1522 self,
1523 &mut qlog,
1524 )?;
1525
1526 trace!(?probe_size, "writing MTUD probe");
1528 trace!("PING");
1529 builder.frame_space_mut().write(frame::FrameType::PING);
1530 qlog.frame(&Frame::Ping);
1531 self.stats.frame_tx.ping += 1;
1532
1533 if self.peer_supports_ack_frequency() {
1535 trace!("IMMEDIATE_ACK");
1536 builder
1537 .frame_space_mut()
1538 .write(frame::FrameType::IMMEDIATE_ACK);
1539 self.stats.frame_tx.immediate_ack += 1;
1540 qlog.frame(&Frame::ImmediateAck);
1541 }
1542
1543 let sent_frames = SentFrames {
1544 non_retransmits: true,
1545 ..Default::default()
1546 };
1547 builder.finish_and_track(
1548 now,
1549 self,
1550 path_id,
1551 sent_frames,
1552 PadDatagram::ToSize(probe_size),
1553 qlog,
1554 );
1555
1556 self.path_stats
1557 .entry(path_id)
1558 .or_default()
1559 .sent_plpmtud_probes += 1;
1560 }
1561 }
1562
1563 if transmit.is_empty() {
1564 return None;
1565 }
1566
1567 let destination = self.path_data(path_id).remote;
1568 trace!(
1569 segment_size = transmit.segment_size(),
1570 last_datagram_len = transmit.len() % transmit.segment_size(),
1571 ?destination,
1572 "sending {} bytes in {} datagrams",
1573 transmit.len(),
1574 transmit.num_datagrams()
1575 );
1576 self.path_data_mut(path_id)
1577 .inc_total_sent(transmit.len() as u64);
1578
1579 self.stats
1580 .udp_tx
1581 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1582
1583 Some(Transmit {
1584 destination,
1585 size: transmit.len(),
1586 ecn: if self.path_data(path_id).sending_ecn {
1587 Some(EcnCodepoint::Ect0)
1588 } else {
1589 None
1590 },
1591 segment_size: match transmit.num_datagrams() {
1592 1 => None,
1593 _ => Some(transmit.segment_size()),
1594 },
1595 src_ip: self.local_ip,
1596 })
1597 }
1598
1599 fn next_send_space(
1604 &mut self,
1605 current_space_id: SpaceId,
1606 path_id: PathId,
1607 buf: &TransmitBuf<'_>,
1608 close: bool,
1609 ) -> Option<SpaceId> {
1610 let mut space_id = current_space_id;
1617 loop {
1618 let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1619 if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1620 return Some(space_id);
1621 }
1622 space_id = match space_id {
1623 SpaceId::Initial => SpaceId::Handshake,
1624 SpaceId::Handshake => SpaceId::Data,
1625 SpaceId::Data => break,
1626 }
1627 }
1628 None
1629 }
1630
1631 fn path_congestion_check(
1633 &mut self,
1634 space_id: SpaceId,
1635 path_id: PathId,
1636 transmit: &TransmitBuf<'_>,
1637 can_send: &SendableFrames,
1638 now: Instant,
1639 ) -> PathBlocked {
1640 if self.side().is_server()
1646 && self
1647 .path_data(path_id)
1648 .anti_amplification_blocked(transmit.len() as u64 + 1)
1649 {
1650 trace!(?space_id, %path_id, "blocked by anti-amplification");
1651 return PathBlocked::AntiAmplification;
1652 }
1653
1654 let bytes_to_send = transmit.segment_size() as u64;
1657 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1658
1659 if can_send.other && !need_loss_probe && !can_send.close {
1660 let path = self.path_data(path_id);
1661 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1662 trace!(?space_id, %path_id, "blocked by congestion control");
1663 return PathBlocked::Congestion;
1664 }
1665 }
1666
1667 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1669 self.timers.set(
1670 Timer::PerPath(path_id, PathTimer::Pacing),
1671 delay,
1672 self.qlog.with_time(now),
1673 );
1674 trace!(?space_id, %path_id, "blocked by pacing");
1677 return PathBlocked::Pacing;
1678 }
1679
1680 PathBlocked::No
1681 }
1682
1683 fn send_prev_path_challenge(
1688 &mut self,
1689 now: Instant,
1690 buf: &mut TransmitBuf<'_>,
1691 path_id: PathId,
1692 ) -> Option<Transmit> {
1693 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1694 if !prev_path.send_new_challenge {
1697 return None;
1698 };
1699 prev_path.send_new_challenge = false;
1700 let destination = prev_path.remote;
1701 let token = self.rng.random();
1702 let info = paths::SentChallengeInfo {
1703 sent_instant: now,
1704 remote: destination,
1705 };
1706 prev_path.challenges_sent.insert(token, info);
1707 debug_assert_eq!(
1708 self.highest_space,
1709 SpaceId::Data,
1710 "PATH_CHALLENGE queued without 1-RTT keys"
1711 );
1712 buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1713
1714 debug_assert_eq!(buf.datagram_start_offset(), 0);
1720 let mut qlog = QlogSentPacket::default();
1721 let mut builder = PacketBuilder::new(
1722 now,
1723 SpaceId::Data,
1724 path_id,
1725 *prev_cid,
1726 buf,
1727 false,
1728 self,
1729 &mut qlog,
1730 )?;
1731 let challenge = frame::PathChallenge(token);
1732 trace!(%challenge, "validating previous path");
1733 qlog.frame(&Frame::PathChallenge(challenge));
1734 builder.frame_space_mut().write(challenge);
1735 self.stats.frame_tx.path_challenge += 1;
1736
1737 builder.pad_to(MIN_INITIAL_SIZE);
1742
1743 builder.finish(self, now, qlog);
1744 self.stats.udp_tx.on_sent(1, buf.len());
1745
1746 Some(Transmit {
1747 destination,
1748 size: buf.len(),
1749 ecn: None,
1750 segment_size: None,
1751 src_ip: self.local_ip,
1752 })
1753 }
1754
1755 fn space_can_send(
1760 &mut self,
1761 space_id: SpaceId,
1762 path_id: PathId,
1763 packet_size: usize,
1764 close: bool,
1765 ) -> SendableFrames {
1766 let pn = self.spaces[SpaceId::Data]
1767 .for_path(path_id)
1768 .peek_tx_number();
1769 let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1770 if self.spaces[space_id].crypto.is_none()
1771 && (space_id != SpaceId::Data
1772 || self.zero_rtt_crypto.is_none()
1773 || self.side.is_server())
1774 {
1775 return SendableFrames::empty();
1777 }
1778 let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1779 if space_id == SpaceId::Data {
1780 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1781 }
1782
1783 can_send.close = close && self.spaces[space_id].crypto.is_some();
1784
1785 can_send
1786 }
1787
1788 pub fn handle_event(&mut self, event: ConnectionEvent) {
1794 use ConnectionEventInner::*;
1795 match event.0 {
1796 Datagram(DatagramConnectionEvent {
1797 now,
1798 remote,
1799 path_id,
1800 ecn,
1801 first_decode,
1802 remaining,
1803 }) => {
1804 let span = trace_span!("pkt", %path_id);
1805 let _guard = span.enter();
1806 if let Some(known_remote) = self.path(path_id).map(|path| path.remote) {
1810 if remote != known_remote && !self.side.remote_may_migrate(&self.state) {
1811 trace!(
1812 %path_id,
1813 ?remote,
1814 path_remote = ?self.path(path_id).map(|p| p.remote),
1815 "discarding packet from unrecognized peer"
1816 );
1817 return;
1818 }
1819 }
1820
1821 let was_anti_amplification_blocked = self
1822 .path(path_id)
1823 .map(|path| path.anti_amplification_blocked(1))
1824 .unwrap_or(true); self.stats.udp_rx.datagrams += 1;
1828 self.stats.udp_rx.bytes += first_decode.len() as u64;
1829 let data_len = first_decode.len();
1830
1831 self.handle_decode(now, remote, path_id, ecn, first_decode);
1832 if let Some(path) = self.path_mut(path_id) {
1837 path.inc_total_recvd(data_len as u64);
1838 }
1839
1840 if let Some(data) = remaining {
1841 self.stats.udp_rx.bytes += data.len() as u64;
1842 self.handle_coalesced(now, remote, path_id, ecn, data);
1843 }
1844
1845 if let Some(path) = self.paths.get_mut(&path_id) {
1846 self.qlog
1847 .emit_recovery_metrics(path_id, &mut path.data, now);
1848 }
1849
1850 if was_anti_amplification_blocked {
1851 self.set_loss_detection_timer(now, path_id);
1855 }
1856 }
1857 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1858 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1859 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1860 let cid_state = self
1861 .local_cid_state
1862 .entry(path_id)
1863 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1864 cid_state.new_cids(&ids, now);
1865
1866 ids.into_iter().rev().for_each(|frame| {
1867 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1868 });
1869 self.reset_cid_retirement(now);
1871 }
1872 }
1873 }
1874
1875 pub fn handle_timeout(&mut self, now: Instant) {
1885 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1886 trace!(?timer, at=?now, "timeout");
1888 match timer {
1889 Timer::Conn(timer) => match timer {
1890 ConnTimer::Close => {
1891 self.state.move_to_drained(None);
1892 self.endpoint_events.push_back(EndpointEventInner::Drained);
1893 }
1894 ConnTimer::Idle => {
1895 self.kill(ConnectionError::TimedOut);
1896 }
1897 ConnTimer::KeepAlive => {
1898 trace!("sending keep-alive");
1899 self.ping();
1900 }
1901 ConnTimer::KeyDiscard => {
1902 self.zero_rtt_crypto = None;
1903 self.prev_crypto = None;
1904 }
1905 ConnTimer::PushNewCid => {
1906 while let Some((path_id, when)) = self.next_cid_retirement() {
1907 if when > now {
1908 break;
1909 }
1910 match self.local_cid_state.get_mut(&path_id) {
1911 None => error!(%path_id, "No local CID state for path"),
1912 Some(cid_state) => {
1913 let num_new_cid = cid_state.on_cid_timeout().into();
1915 if !self.state.is_closed() {
1916 trace!(
1917 "push a new CID to peer RETIRE_PRIOR_TO field {}",
1918 cid_state.retire_prior_to()
1919 );
1920 self.endpoint_events.push_back(
1921 EndpointEventInner::NeedIdentifiers(
1922 path_id,
1923 now,
1924 num_new_cid,
1925 ),
1926 );
1927 }
1928 }
1929 }
1930 }
1931 }
1932 },
1933 Timer::PerPath(path_id, timer) => {
1935 let span = trace_span!("per-path timer fired", %path_id, ?timer);
1936 let _guard = span.enter();
1937 match timer {
1938 PathTimer::PathIdle => {
1939 self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1940 .ok();
1941 }
1942
1943 PathTimer::PathKeepAlive => {
1944 trace!("sending keep-alive on path");
1945 self.ping_path(path_id).ok();
1946 }
1947 PathTimer::LossDetection => {
1948 self.on_loss_detection_timeout(now, path_id);
1949 self.qlog.emit_recovery_metrics(
1950 path_id,
1951 &mut self.paths.get_mut(&path_id).unwrap().data,
1952 now,
1953 );
1954 }
1955 PathTimer::PathValidation => {
1956 let Some(path) = self.paths.get_mut(&path_id) else {
1957 continue;
1958 };
1959 self.timers.stop(
1960 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1961 self.qlog.with_time(now),
1962 );
1963 debug!("path validation failed");
1964 if let Some((_, prev)) = path.prev.take() {
1965 path.data = prev;
1966 }
1967 path.data.challenges_sent.clear();
1968 path.data.send_new_challenge = false;
1969 }
1970 PathTimer::PathChallengeLost => {
1971 let Some(path) = self.paths.get_mut(&path_id) else {
1972 continue;
1973 };
1974 trace!("path challenge deemed lost");
1975 path.data.send_new_challenge = true;
1976 }
1977 PathTimer::PathOpen => {
1978 let Some(path) = self.path_mut(path_id) else {
1979 continue;
1980 };
1981 path.challenges_sent.clear();
1982 path.send_new_challenge = false;
1983 debug!("new path validation failed");
1984 if let Err(err) = self.close_path(
1985 now,
1986 path_id,
1987 TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
1988 ) {
1989 warn!(?err, "failed closing path");
1990 }
1991
1992 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1993 id: path_id,
1994 error: PathError::ValidationFailed,
1995 }));
1996 }
1997 PathTimer::Pacing => trace!("pacing timer expired"),
1998 PathTimer::MaxAckDelay => {
1999 trace!("max ack delay reached");
2000 self.spaces[SpaceId::Data]
2002 .for_path(path_id)
2003 .pending_acks
2004 .on_max_ack_delay_timeout()
2005 }
2006 PathTimer::DiscardPath => {
2007 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2010 if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2011 let (min_seq, max_seq) = loc_cid_state.active_seq();
2012 for seq in min_seq..=max_seq {
2013 self.endpoint_events.push_back(
2014 EndpointEventInner::RetireConnectionId(
2015 now, path_id, seq, false,
2016 ),
2017 );
2018 }
2019 }
2020 self.discard_path(path_id, now);
2021 }
2022 }
2023 }
2024 }
2025 }
2026 }
2027
2028 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2040 self.close_inner(
2041 now,
2042 Close::Application(frame::ApplicationClose { error_code, reason }),
2043 )
2044 }
2045
2046 fn close_inner(&mut self, now: Instant, reason: Close) {
2047 let was_closed = self.state.is_closed();
2048 if !was_closed {
2049 self.close_common();
2050 self.set_close_timer(now);
2051 self.close = true;
2052 self.state.move_to_closed_local(reason);
2053 }
2054 }
2055
2056 pub fn datagrams(&mut self) -> Datagrams<'_> {
2058 Datagrams { conn: self }
2059 }
2060
2061 pub fn stats(&mut self) -> ConnectionStats {
2063 self.stats.clone()
2064 }
2065
2066 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2068 let path = self.paths.get(&path_id)?;
2069 let stats = self.path_stats.entry(path_id).or_default();
2070 stats.rtt = path.data.rtt.get();
2071 stats.cwnd = path.data.congestion.window();
2072 stats.current_mtu = path.data.mtud.current_mtu();
2073 Some(*stats)
2074 }
2075
2076 pub fn ping(&mut self) {
2080 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2083 path_data.ping_pending = true;
2084 }
2085 }
2086
2087 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2091 let path_data = self.spaces[self.highest_space]
2092 .number_spaces
2093 .get_mut(&path)
2094 .ok_or(ClosedPath { _private: () })?;
2095 path_data.ping_pending = true;
2096 Ok(())
2097 }
2098
2099 pub fn force_key_update(&mut self) {
2103 if !self.state.is_established() {
2104 debug!("ignoring forced key update in illegal state");
2105 return;
2106 }
2107 if self.prev_crypto.is_some() {
2108 debug!("ignoring redundant forced key update");
2111 return;
2112 }
2113 self.update_keys(None, false);
2114 }
2115
2116 #[doc(hidden)]
2118 #[deprecated]
2119 pub fn initiate_key_update(&mut self) {
2120 self.force_key_update();
2121 }
2122
2123 pub fn crypto_session(&self) -> &dyn crypto::Session {
2125 &*self.crypto
2126 }
2127
2128 pub fn is_handshaking(&self) -> bool {
2133 self.state.is_handshake()
2134 }
2135
2136 pub fn is_closed(&self) -> bool {
2144 self.state.is_closed()
2145 }
2146
2147 pub fn is_drained(&self) -> bool {
2152 self.state.is_drained()
2153 }
2154
2155 pub fn accepted_0rtt(&self) -> bool {
2159 self.accepted_0rtt
2160 }
2161
2162 pub fn has_0rtt(&self) -> bool {
2164 self.zero_rtt_enabled
2165 }
2166
2167 pub fn has_pending_retransmits(&self) -> bool {
2169 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2170 }
2171
2172 pub fn side(&self) -> Side {
2174 self.side.side()
2175 }
2176
2177 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2179 self.path(path_id)
2180 .map(|path_data| {
2181 path_data
2182 .last_observed_addr_report
2183 .as_ref()
2184 .map(|observed| observed.socket_addr())
2185 })
2186 .ok_or(ClosedPath { _private: () })
2187 }
2188
2189 pub fn local_ip(&self) -> Option<IpAddr> {
2199 self.local_ip
2200 }
2201
2202 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2204 self.path(path_id).map(|d| d.rtt.get())
2205 }
2206
2207 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2209 self.path(path_id).map(|d| d.congestion.as_ref())
2210 }
2211
2212 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2217 self.streams.set_max_concurrent(dir, count);
2218 let pending = &mut self.spaces[SpaceId::Data].pending;
2221 self.streams.queue_max_stream_id(pending);
2222 }
2223
2224 pub fn set_max_concurrent_paths(
2234 &mut self,
2235 now: Instant,
2236 count: NonZeroU32,
2237 ) -> Result<(), MultipathNotNegotiated> {
2238 if !self.is_multipath_negotiated() {
2239 return Err(MultipathNotNegotiated { _private: () });
2240 }
2241 self.max_concurrent_paths = count;
2242
2243 let in_use_count = self
2244 .local_max_path_id
2245 .next()
2246 .saturating_sub(self.abandoned_paths.len() as u32)
2247 .as_u32();
2248 let extra_needed = count.get().saturating_sub(in_use_count);
2249 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2250
2251 self.set_max_path_id(now, new_max_path_id);
2252
2253 Ok(())
2254 }
2255
2256 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2258 if max_path_id <= self.local_max_path_id {
2259 return;
2260 }
2261
2262 self.local_max_path_id = max_path_id;
2263 self.spaces[SpaceId::Data].pending.max_path_id = true;
2264
2265 self.issue_first_path_cids(now);
2266 }
2267
2268 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2274 self.streams.max_concurrent(dir)
2275 }
2276
2277 pub fn set_send_window(&mut self, send_window: u64) {
2279 self.streams.set_send_window(send_window);
2280 }
2281
2282 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2284 if self.streams.set_receive_window(receive_window) {
2285 self.spaces[SpaceId::Data].pending.max_data = true;
2286 }
2287 }
2288
2289 pub fn is_multipath_negotiated(&self) -> bool {
2294 !self.is_handshaking()
2295 && self.config.max_concurrent_multipath_paths.is_some()
2296 && self.peer_params.initial_max_path_id.is_some()
2297 }
2298
2299 fn on_ack_received(
2300 &mut self,
2301 now: Instant,
2302 space: SpaceId,
2303 ack: frame::Ack,
2304 ) -> Result<(), TransportError> {
2305 let path = PathId::ZERO;
2307 self.inner_on_ack_received(now, space, path, ack)
2308 }
2309
2310 fn on_path_ack_received(
2311 &mut self,
2312 now: Instant,
2313 space: SpaceId,
2314 path_ack: frame::PathAck,
2315 ) -> Result<(), TransportError> {
2316 let (ack, path) = path_ack.into_ack();
2317 self.inner_on_ack_received(now, space, path, ack)
2318 }
2319
2320 fn inner_on_ack_received(
2322 &mut self,
2323 now: Instant,
2324 space: SpaceId,
2325 path: PathId,
2326 ack: frame::Ack,
2327 ) -> Result<(), TransportError> {
2328 if self.abandoned_paths.contains(&path) {
2329 trace!("silently ignoring PATH_ACK on abandoned path");
2332 return Ok(());
2333 }
2334 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2335 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2336 }
2337 let new_largest = {
2338 let space = &mut self.spaces[space].for_path(path);
2339 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2340 space.largest_acked_packet = Some(ack.largest);
2341 if let Some(info) = space.sent_packets.get(ack.largest) {
2342 space.largest_acked_packet_sent = info.time_sent;
2346 }
2347 true
2348 } else {
2349 false
2350 }
2351 };
2352
2353 if self.detect_spurious_loss(&ack, space, path) {
2354 self.path_data_mut(path)
2355 .congestion
2356 .on_spurious_congestion_event();
2357 }
2358
2359 let mut newly_acked = ArrayRangeSet::new();
2361 for range in ack.iter() {
2362 self.spaces[space].for_path(path).check_ack(range.clone())?;
2363 for (pn, _) in self.spaces[space]
2364 .for_path(path)
2365 .sent_packets
2366 .iter_range(range)
2367 {
2368 newly_acked.insert_one(pn);
2369 }
2370 }
2371
2372 if newly_acked.is_empty() {
2373 return Ok(());
2374 }
2375
2376 let mut ack_eliciting_acked = false;
2377 for packet in newly_acked.elts() {
2378 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2379 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2380 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2386 pns.pending_acks.subtract_below(*acked_pn);
2387 }
2388 }
2389 ack_eliciting_acked |= info.ack_eliciting;
2390
2391 let path_data = self.path_data_mut(path);
2393 let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2394 if mtu_updated {
2395 path_data
2396 .congestion
2397 .on_mtu_update(path_data.mtud.current_mtu());
2398 }
2399
2400 self.ack_frequency.on_acked(path, packet);
2402
2403 self.on_packet_acked(now, path, info);
2404 }
2405 }
2406
2407 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2408 let app_limited = self.app_limited;
2409 let path_data = self.path_data_mut(path);
2410 let in_flight = path_data.in_flight.bytes;
2411
2412 path_data
2413 .congestion
2414 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2415
2416 if new_largest && ack_eliciting_acked {
2417 let ack_delay = if space != SpaceId::Data {
2418 Duration::from_micros(0)
2419 } else {
2420 cmp::min(
2421 self.ack_frequency.peer_max_ack_delay,
2422 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2423 )
2424 };
2425 let rtt = now.saturating_duration_since(
2426 self.spaces[space].for_path(path).largest_acked_packet_sent,
2427 );
2428
2429 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2430 let path_data = self.path_data_mut(path);
2431 path_data.rtt.update(ack_delay, rtt);
2433 if path_data.first_packet_after_rtt_sample.is_none() {
2434 path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2435 }
2436 }
2437
2438 self.detect_lost_packets(now, space, path, true);
2440
2441 if self.peer_completed_address_validation(path) {
2442 self.path_data_mut(path).pto_count = 0;
2443 }
2444
2445 if self.path_data(path).sending_ecn {
2450 if let Some(ecn) = ack.ecn {
2451 if new_largest {
2456 let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2457 self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2458 }
2459 } else {
2460 debug!("ECN not acknowledged by peer");
2462 self.path_data_mut(path).sending_ecn = false;
2463 }
2464 }
2465
2466 self.set_loss_detection_timer(now, path);
2467 Ok(())
2468 }
2469
2470 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2471 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2472
2473 if lost_packets.is_empty() {
2474 return false;
2475 }
2476
2477 for range in ack.iter() {
2478 let spurious_losses: Vec<u64> = lost_packets
2479 .iter_range(range.clone())
2480 .map(|(pn, _info)| pn)
2481 .collect();
2482
2483 for pn in spurious_losses {
2484 lost_packets.remove(pn);
2485 }
2486 }
2487
2488 lost_packets.is_empty()
2493 }
2494
2495 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2500 let two_pto = 2 * self.path_data(path).rtt.pto_base();
2501
2502 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2503 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2504 }
2505
2506 fn process_ecn(
2508 &mut self,
2509 now: Instant,
2510 space: SpaceId,
2511 path: PathId,
2512 newly_acked: u64,
2513 ecn: frame::EcnCounts,
2514 largest_sent_time: Instant,
2515 ) {
2516 match self.spaces[space]
2517 .for_path(path)
2518 .detect_ecn(newly_acked, ecn)
2519 {
2520 Err(e) => {
2521 debug!("halting ECN due to verification failure: {}", e);
2522
2523 self.path_data_mut(path).sending_ecn = false;
2524 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2527 }
2528 Ok(false) => {}
2529 Ok(true) => {
2530 self.path_stats.entry(path).or_default().congestion_events += 1;
2531 self.path_data_mut(path).congestion.on_congestion_event(
2532 now,
2533 largest_sent_time,
2534 false,
2535 true,
2536 0,
2537 );
2538 }
2539 }
2540 }
2541
2542 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2545 self.paths
2546 .get_mut(&path_id)
2547 .expect("known path")
2548 .remove_in_flight(&info);
2549 let app_limited = self.app_limited;
2550 let path = self.path_data_mut(path_id);
2551 if info.ack_eliciting && !path.is_validating_path() {
2552 let rtt = path.rtt;
2555 path.congestion
2556 .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2557 }
2558
2559 if let Some(retransmits) = info.retransmits.get() {
2561 for (id, _) in retransmits.reset_stream.iter() {
2562 self.streams.reset_acked(*id);
2563 }
2564 }
2565
2566 for frame in info.stream_frames {
2567 self.streams.received_ack_of(frame);
2568 }
2569 }
2570
2571 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2572 let start = if self.zero_rtt_crypto.is_some() {
2573 now
2574 } else {
2575 self.prev_crypto
2576 .as_ref()
2577 .expect("no previous keys")
2578 .end_packet
2579 .as_ref()
2580 .expect("update not acknowledged yet")
2581 .1
2582 };
2583
2584 self.timers.set(
2586 Timer::Conn(ConnTimer::KeyDiscard),
2587 start + self.pto_max_path(space, false) * 3,
2588 self.qlog.with_time(now),
2589 );
2590 }
2591
2592 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2605 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2606 self.detect_lost_packets(now, pn_space, path_id, false);
2608 self.set_loss_detection_timer(now, path_id);
2609 return;
2610 }
2611
2612 let (_, space) = match self.pto_time_and_space(now, path_id) {
2613 Some(x) => x,
2614 None => {
2615 error!(%path_id, "PTO expired while unset");
2616 return;
2617 }
2618 };
2619 trace!(
2620 in_flight = self.path_data(path_id).in_flight.bytes,
2621 count = self.path_data(path_id).pto_count,
2622 ?space,
2623 %path_id,
2624 "PTO fired"
2625 );
2626
2627 let count = match self.path_data(path_id).in_flight.ack_eliciting {
2628 0 => {
2631 debug_assert!(!self.peer_completed_address_validation(path_id));
2632 1
2633 }
2634 _ => 2,
2636 };
2637 let pns = self.spaces[space].for_path(path_id);
2638 pns.loss_probes = pns.loss_probes.saturating_add(count);
2639 let path_data = self.path_data_mut(path_id);
2640 path_data.pto_count = path_data.pto_count.saturating_add(1);
2641 self.set_loss_detection_timer(now, path_id);
2642 }
2643
2644 fn detect_lost_packets(
2661 &mut self,
2662 now: Instant,
2663 pn_space: SpaceId,
2664 path_id: PathId,
2665 due_to_ack: bool,
2666 ) {
2667 let mut lost_packets = Vec::<u64>::new();
2668 let mut lost_mtu_probe = None;
2669 let mut in_persistent_congestion = false;
2670 let mut size_of_lost_packets = 0u64;
2671 self.spaces[pn_space].for_path(path_id).loss_time = None;
2672
2673 let path = self.path_data(path_id);
2676 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2677 let loss_delay = path
2678 .rtt
2679 .conservative()
2680 .mul_f32(self.config.time_threshold)
2681 .max(TIMER_GRANULARITY);
2682 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2683
2684 let largest_acked_packet = self.spaces[pn_space]
2685 .for_path(path_id)
2686 .largest_acked_packet
2687 .expect("detect_lost_packets only to be called if path received at least one ACK");
2688 let packet_threshold = self.config.packet_threshold as u64;
2689
2690 let congestion_period = self
2694 .pto(SpaceId::Data, path_id)
2695 .saturating_mul(self.config.persistent_congestion_threshold);
2696 let mut persistent_congestion_start: Option<Instant> = None;
2697 let mut prev_packet = None;
2698 let space = self.spaces[pn_space].for_path(path_id);
2699
2700 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2701 if prev_packet != Some(packet.wrapping_sub(1)) {
2702 persistent_congestion_start = None;
2704 }
2705
2706 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2710 if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2711 if Some(packet) == in_flight_mtu_probe {
2713 lost_mtu_probe = in_flight_mtu_probe;
2716 } else {
2717 lost_packets.push(packet);
2718 size_of_lost_packets += info.size as u64;
2719 if info.ack_eliciting && due_to_ack {
2720 match persistent_congestion_start {
2721 Some(start) if info.time_sent - start > congestion_period => {
2724 in_persistent_congestion = true;
2725 }
2726 None if first_packet_after_rtt_sample
2728 .is_some_and(|x| x < (pn_space, packet)) =>
2729 {
2730 persistent_congestion_start = Some(info.time_sent);
2731 }
2732 _ => {}
2733 }
2734 }
2735 }
2736 } else {
2737 if space.loss_time.is_none() {
2739 space.loss_time = Some(info.time_sent + loss_delay);
2742 }
2743 persistent_congestion_start = None;
2744 }
2745
2746 prev_packet = Some(packet);
2747 }
2748
2749 self.handle_lost_packets(
2750 pn_space,
2751 path_id,
2752 now,
2753 lost_packets,
2754 lost_mtu_probe,
2755 loss_delay,
2756 in_persistent_congestion,
2757 size_of_lost_packets,
2758 );
2759 }
2760
2761 fn discard_path(&mut self, path_id: PathId, now: Instant) {
2763 trace!(%path_id, "dropping path state");
2764 let path = self.path_data(path_id);
2765 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2766
2767 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2769 .for_path(path_id)
2770 .sent_packets
2771 .iter()
2772 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2773 .map(|(pn, info)| {
2774 size_of_lost_packets += info.size as u64;
2775 pn
2776 })
2777 .collect();
2778
2779 if !lost_pns.is_empty() {
2780 trace!(
2781 %path_id,
2782 count = lost_pns.len(),
2783 lost_bytes = size_of_lost_packets,
2784 "packets lost on path abandon"
2785 );
2786 self.handle_lost_packets(
2787 SpaceId::Data,
2788 path_id,
2789 now,
2790 lost_pns,
2791 in_flight_mtu_probe,
2792 Duration::ZERO,
2793 false,
2794 size_of_lost_packets,
2795 );
2796 }
2797 self.paths.remove(&path_id);
2798 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2799
2800 let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2801 self.events.push_back(
2802 PathEvent::Abandoned {
2803 id: path_id,
2804 path_stats,
2805 }
2806 .into(),
2807 );
2808 }
2809
2810 fn handle_lost_packets(
2811 &mut self,
2812 pn_space: SpaceId,
2813 path_id: PathId,
2814 now: Instant,
2815 lost_packets: Vec<u64>,
2816 lost_mtu_probe: Option<u64>,
2817 loss_delay: Duration,
2818 in_persistent_congestion: bool,
2819 size_of_lost_packets: u64,
2820 ) {
2821 debug_assert!(
2822 {
2823 let mut sorted = lost_packets.clone();
2824 sorted.sort();
2825 sorted == lost_packets
2826 },
2827 "lost_packets must be sorted"
2828 );
2829
2830 self.drain_lost_packets(now, pn_space, path_id);
2831
2832 if let Some(largest_lost) = lost_packets.last().cloned() {
2834 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2835 let largest_lost_sent = self.spaces[pn_space]
2836 .for_path(path_id)
2837 .sent_packets
2838 .get(largest_lost)
2839 .unwrap()
2840 .time_sent;
2841 let path_stats = self.path_stats.entry(path_id).or_default();
2842 path_stats.lost_packets += lost_packets.len() as u64;
2843 path_stats.lost_bytes += size_of_lost_packets;
2844 trace!(
2845 %path_id,
2846 count = lost_packets.len(),
2847 lost_bytes = size_of_lost_packets,
2848 "packets lost",
2849 );
2850
2851 for &packet in &lost_packets {
2852 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2853 continue;
2854 };
2855 self.qlog
2856 .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2857 self.paths
2858 .get_mut(&path_id)
2859 .unwrap()
2860 .remove_in_flight(&info);
2861
2862 for frame in info.stream_frames {
2863 self.streams.retransmit(frame);
2864 }
2865 self.spaces[pn_space].pending |= info.retransmits;
2866 self.path_data_mut(path_id)
2867 .mtud
2868 .on_non_probe_lost(packet, info.size);
2869
2870 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2871 packet,
2872 LostPacket {
2873 time_sent: info.time_sent,
2874 },
2875 );
2876 }
2877
2878 let path = self.path_data_mut(path_id);
2879 if path.mtud.black_hole_detected(now) {
2880 path.congestion.on_mtu_update(path.mtud.current_mtu());
2881 if let Some(max_datagram_size) = self.datagrams().max_size() {
2882 self.datagrams.drop_oversized(max_datagram_size);
2883 }
2884 self.path_stats
2885 .entry(path_id)
2886 .or_default()
2887 .black_holes_detected += 1;
2888 }
2889
2890 let lost_ack_eliciting =
2892 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2893
2894 if lost_ack_eliciting {
2895 self.path_stats
2896 .entry(path_id)
2897 .or_default()
2898 .congestion_events += 1;
2899 self.path_data_mut(path_id).congestion.on_congestion_event(
2900 now,
2901 largest_lost_sent,
2902 in_persistent_congestion,
2903 false,
2904 size_of_lost_packets,
2905 );
2906 }
2907 }
2908
2909 if let Some(packet) = lost_mtu_probe {
2911 let info = self.spaces[SpaceId::Data]
2912 .for_path(path_id)
2913 .take(packet)
2914 .unwrap(); self.paths
2917 .get_mut(&path_id)
2918 .unwrap()
2919 .remove_in_flight(&info);
2920 self.path_data_mut(path_id).mtud.on_probe_lost();
2921 self.path_stats
2922 .entry(path_id)
2923 .or_default()
2924 .lost_plpmtud_probes += 1;
2925 }
2926 }
2927
2928 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2934 SpaceId::iter()
2935 .filter_map(|id| {
2936 self.spaces[id]
2937 .number_spaces
2938 .get(&path_id)
2939 .and_then(|pns| pns.loss_time)
2940 .map(|time| (time, id))
2941 })
2942 .min_by_key(|&(time, _)| time)
2943 }
2944
2945 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2947 let path = self.path(path_id)?;
2948 let pto_count = path.pto_count;
2949 let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2950 let mut duration = path.rtt.pto_base() * backoff;
2951
2952 if path_id == PathId::ZERO
2953 && path.in_flight.ack_eliciting == 0
2954 && !self.peer_completed_address_validation(PathId::ZERO)
2955 {
2956 let space = match self.highest_space {
2962 SpaceId::Handshake => SpaceId::Handshake,
2963 _ => SpaceId::Initial,
2964 };
2965
2966 return Some((now + duration, space));
2967 }
2968
2969 let mut result = None;
2970 for space in SpaceId::iter() {
2971 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2972 continue;
2973 };
2974
2975 if !pns.has_in_flight() {
2976 continue;
2977 }
2978 if space == SpaceId::Data {
2979 if self.is_handshaking() {
2981 return result;
2982 }
2983 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
2985 }
2986 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
2987 continue;
2988 };
2989 let pto = last_ack_eliciting + duration;
2990 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
2991 if path.anti_amplification_blocked(1) {
2992 continue;
2994 }
2995 if path.in_flight.ack_eliciting == 0 {
2996 continue;
2998 }
2999 result = Some((pto, space));
3000 }
3001 }
3002 result
3003 }
3004
3005 fn peer_completed_address_validation(&self, path: PathId) -> bool {
3006 if self.side.is_server() || self.state.is_closed() {
3008 return true;
3009 }
3010 self.spaces[SpaceId::Handshake]
3013 .path_space(PathId::ZERO)
3014 .and_then(|pns| pns.largest_acked_packet)
3015 .is_some()
3016 || self.spaces[SpaceId::Data]
3017 .path_space(path)
3018 .and_then(|pns| pns.largest_acked_packet)
3019 .is_some()
3020 || (self.spaces[SpaceId::Data].crypto.is_some()
3021 && self.spaces[SpaceId::Handshake].crypto.is_none())
3022 }
3023
3024 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3032 if self.state.is_closed() {
3033 return;
3037 }
3038
3039 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3040 self.timers.set(
3042 Timer::PerPath(path_id, PathTimer::LossDetection),
3043 loss_time,
3044 self.qlog.with_time(now),
3045 );
3046 return;
3047 }
3048
3049 if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3052 self.timers.set(
3053 Timer::PerPath(path_id, PathTimer::LossDetection),
3054 timeout,
3055 self.qlog.with_time(now),
3056 );
3057 } else {
3058 self.timers.stop(
3059 Timer::PerPath(path_id, PathTimer::LossDetection),
3060 self.qlog.with_time(now),
3061 );
3062 }
3063 }
3064
3065 fn pto_max_path(&self, space: SpaceId, is_closing: bool) -> Duration {
3071 match space {
3072 SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3073 SpaceId::Data => self
3074 .paths
3075 .iter()
3076 .filter_map(|(path_id, state)| {
3077 if is_closing && state.data.total_sent == 0 && state.data.total_recvd == 0 {
3078 None
3080 } else {
3081 let pto = self.pto(space, *path_id);
3082 Some(pto)
3083 }
3084 })
3085 .max()
3086 .expect("there should be at least one path"),
3087 }
3088 }
3089
3090 fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3095 let max_ack_delay = match space {
3096 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3097 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3098 };
3099 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3100 }
3101
3102 fn on_packet_authenticated(
3103 &mut self,
3104 now: Instant,
3105 space_id: SpaceId,
3106 path_id: PathId,
3107 ecn: Option<EcnCodepoint>,
3108 packet: Option<u64>,
3109 spin: bool,
3110 is_1rtt: bool,
3111 ) {
3112 self.total_authed_packets += 1;
3113 if let Some(AbandonState::ExpectingPathAbandon { deadline }) = self
3114 .paths
3115 .get(&path_id)
3116 .map(|path| &path.data.abandon_state)
3117 {
3118 if now > *deadline {
3119 warn!("received data on path which we abandoned more than 3 * PTO ago");
3120 if !self.state.is_closed() {
3122 self.state.move_to_closed(TransportError::NO_ERROR(
3124 "peer failed to respond with PATH_ABANDON in time",
3125 ));
3126 self.close_common();
3127 self.set_close_timer(now);
3128 self.close = true;
3129 }
3130 return;
3131 }
3132 }
3133
3134 self.reset_keep_alive(path_id, now);
3135 self.reset_idle_timeout(now, space_id, path_id);
3136 self.permit_idle_reset = true;
3137 self.receiving_ecn |= ecn.is_some();
3138 if let Some(x) = ecn {
3139 let space = &mut self.spaces[space_id];
3140 space.for_path(path_id).ecn_counters += x;
3141
3142 if x.is_ce() {
3143 space
3144 .for_path(path_id)
3145 .pending_acks
3146 .set_immediate_ack_required();
3147 }
3148 }
3149
3150 let packet = match packet {
3151 Some(x) => x,
3152 None => return,
3153 };
3154 match &self.side {
3155 ConnectionSide::Client { .. } => {
3156 if space_id == SpaceId::Handshake {
3160 if let Some(hs) = self.state.as_handshake_mut() {
3161 hs.allow_server_migration = false;
3162 }
3163 }
3164 }
3165 ConnectionSide::Server { .. } => {
3166 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3167 {
3168 self.discard_space(now, SpaceId::Initial);
3170 }
3171 if self.zero_rtt_crypto.is_some() && is_1rtt {
3172 self.set_key_discard_timer(now, space_id)
3174 }
3175 }
3176 }
3177 let space = self.spaces[space_id].for_path(path_id);
3178 space.pending_acks.insert_one(packet, now);
3179 if packet >= space.rx_packet.unwrap_or_default() {
3180 space.rx_packet = Some(packet);
3181 self.spin = self.side.is_client() ^ spin;
3183 }
3184 }
3185
3186 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3191 if let Some(timeout) = self.idle_timeout {
3193 if self.state.is_closed() {
3194 self.timers
3195 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3196 } else {
3197 let dt = cmp::max(timeout, 3 * self.pto_max_path(space, false));
3198 self.timers.set(
3199 Timer::Conn(ConnTimer::Idle),
3200 now + dt,
3201 self.qlog.with_time(now),
3202 );
3203 }
3204 }
3205
3206 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3208 if self.state.is_closed() {
3209 self.timers.stop(
3210 Timer::PerPath(path_id, PathTimer::PathIdle),
3211 self.qlog.with_time(now),
3212 );
3213 } else {
3214 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3215 self.timers.set(
3216 Timer::PerPath(path_id, PathTimer::PathIdle),
3217 now + dt,
3218 self.qlog.with_time(now),
3219 );
3220 }
3221 }
3222 }
3223
3224 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3226 if !self.state.is_established() {
3227 return;
3228 }
3229
3230 if let Some(interval) = self.config.keep_alive_interval {
3231 self.timers.set(
3232 Timer::Conn(ConnTimer::KeepAlive),
3233 now + interval,
3234 self.qlog.with_time(now),
3235 );
3236 }
3237
3238 if let Some(interval) = self.path_data(path_id).keep_alive {
3239 self.timers.set(
3240 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3241 now + interval,
3242 self.qlog.with_time(now),
3243 );
3244 }
3245 }
3246
3247 fn reset_cid_retirement(&mut self, now: Instant) {
3249 if let Some((_path, t)) = self.next_cid_retirement() {
3250 self.timers.set(
3251 Timer::Conn(ConnTimer::PushNewCid),
3252 t,
3253 self.qlog.with_time(now),
3254 );
3255 }
3256 }
3257
3258 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3260 self.local_cid_state
3261 .iter()
3262 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3263 .min_by_key(|(_path_id, timeout)| *timeout)
3264 }
3265
3266 pub(crate) fn handle_first_packet(
3271 &mut self,
3272 now: Instant,
3273 remote: SocketAddr,
3274 ecn: Option<EcnCodepoint>,
3275 packet_number: u64,
3276 packet: InitialPacket,
3277 remaining: Option<BytesMut>,
3278 ) -> Result<(), ConnectionError> {
3279 let span = trace_span!("first recv");
3280 let _guard = span.enter();
3281 debug_assert!(self.side.is_server());
3282 let len = packet.header_data.len() + packet.payload.len();
3283 let path_id = PathId::ZERO;
3284 self.path_data_mut(path_id).total_recvd = len as u64;
3285
3286 if let Some(hs) = self.state.as_handshake_mut() {
3287 hs.expected_token = packet.header.token.clone();
3288 } else {
3289 unreachable!("first packet must be delivered in Handshake state");
3290 }
3291
3292 self.on_packet_authenticated(
3294 now,
3295 SpaceId::Initial,
3296 path_id,
3297 ecn,
3298 Some(packet_number),
3299 false,
3300 false,
3301 );
3302
3303 let packet: Packet = packet.into();
3304
3305 let mut qlog = QlogRecvPacket::new(len);
3306 qlog.header(&packet.header, Some(packet_number), path_id);
3307
3308 self.process_decrypted_packet(
3309 now,
3310 remote,
3311 path_id,
3312 Some(packet_number),
3313 packet,
3314 &mut qlog,
3315 )?;
3316 self.qlog.emit_packet_received(qlog, now);
3317 if let Some(data) = remaining {
3318 self.handle_coalesced(now, remote, path_id, ecn, data);
3319 }
3320
3321 self.qlog.emit_recovery_metrics(
3322 path_id,
3323 &mut self.paths.get_mut(&path_id).unwrap().data,
3324 now,
3325 );
3326
3327 Ok(())
3328 }
3329
3330 fn init_0rtt(&mut self, now: Instant) {
3331 let (header, packet) = match self.crypto.early_crypto() {
3332 Some(x) => x,
3333 None => return,
3334 };
3335 if self.side.is_client() {
3336 match self.crypto.transport_parameters() {
3337 Ok(params) => {
3338 let params = params
3339 .expect("crypto layer didn't supply transport parameters with ticket");
3340 let params = TransportParameters {
3342 initial_src_cid: None,
3343 original_dst_cid: None,
3344 preferred_address: None,
3345 retry_src_cid: None,
3346 stateless_reset_token: None,
3347 min_ack_delay: None,
3348 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3349 max_ack_delay: TransportParameters::default().max_ack_delay,
3350 initial_max_path_id: None,
3351 ..params
3352 };
3353 self.set_peer_params(params);
3354 self.qlog.emit_peer_transport_params_restored(self, now);
3355 }
3356 Err(e) => {
3357 error!("session ticket has malformed transport parameters: {}", e);
3358 return;
3359 }
3360 }
3361 }
3362 trace!("0-RTT enabled");
3363 self.zero_rtt_enabled = true;
3364 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3365 }
3366
3367 fn read_crypto(
3368 &mut self,
3369 space: SpaceId,
3370 crypto: &frame::Crypto,
3371 payload_len: usize,
3372 ) -> Result<(), TransportError> {
3373 let expected = if !self.state.is_handshake() {
3374 SpaceId::Data
3375 } else if self.highest_space == SpaceId::Initial {
3376 SpaceId::Initial
3377 } else {
3378 SpaceId::Handshake
3381 };
3382 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3386
3387 let end = crypto.offset + crypto.data.len() as u64;
3388 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3389 warn!(
3390 "received new {:?} CRYPTO data when expecting {:?}",
3391 space, expected
3392 );
3393 return Err(TransportError::PROTOCOL_VIOLATION(
3394 "new data at unexpected encryption level",
3395 ));
3396 }
3397
3398 let space = &mut self.spaces[space];
3399 let max = end.saturating_sub(space.crypto_stream.bytes_read());
3400 if max > self.config.crypto_buffer_size as u64 {
3401 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3402 }
3403
3404 space
3405 .crypto_stream
3406 .insert(crypto.offset, crypto.data.clone(), payload_len);
3407 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3408 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3409 if self.crypto.read_handshake(&chunk.bytes)? {
3410 self.events.push_back(Event::HandshakeDataReady);
3411 }
3412 }
3413
3414 Ok(())
3415 }
3416
3417 fn write_crypto(&mut self) {
3418 loop {
3419 let space = self.highest_space;
3420 let mut outgoing = Vec::new();
3421 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3422 match space {
3423 SpaceId::Initial => {
3424 self.upgrade_crypto(SpaceId::Handshake, crypto);
3425 }
3426 SpaceId::Handshake => {
3427 self.upgrade_crypto(SpaceId::Data, crypto);
3428 }
3429 _ => unreachable!("got updated secrets during 1-RTT"),
3430 }
3431 }
3432 if outgoing.is_empty() {
3433 if space == self.highest_space {
3434 break;
3435 } else {
3436 continue;
3438 }
3439 }
3440 let offset = self.spaces[space].crypto_offset;
3441 let outgoing = Bytes::from(outgoing);
3442 if let Some(hs) = self.state.as_handshake_mut() {
3443 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3444 hs.client_hello = Some(outgoing.clone());
3445 }
3446 }
3447 self.spaces[space].crypto_offset += outgoing.len() as u64;
3448 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3449 self.spaces[space].pending.crypto.push_back(frame::Crypto {
3450 offset,
3451 data: outgoing,
3452 });
3453 }
3454 }
3455
3456 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3458 debug_assert!(
3459 self.spaces[space].crypto.is_none(),
3460 "already reached packet space {space:?}"
3461 );
3462 trace!("{:?} keys ready", space);
3463 if space == SpaceId::Data {
3464 self.next_crypto = Some(
3466 self.crypto
3467 .next_1rtt_keys()
3468 .expect("handshake should be complete"),
3469 );
3470 }
3471
3472 self.spaces[space].crypto = Some(crypto);
3473 debug_assert!(space as usize > self.highest_space as usize);
3474 self.highest_space = space;
3475 if space == SpaceId::Data && self.side.is_client() {
3476 self.zero_rtt_crypto = None;
3478 }
3479 }
3480
3481 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3482 debug_assert!(space_id != SpaceId::Data);
3483 trace!("discarding {:?} keys", space_id);
3484 if space_id == SpaceId::Initial {
3485 if let ConnectionSide::Client { token, .. } = &mut self.side {
3487 *token = Bytes::new();
3488 }
3489 }
3490 let space = &mut self.spaces[space_id];
3491 space.crypto = None;
3492 let pns = space.for_path(PathId::ZERO);
3493 pns.time_of_last_ack_eliciting_packet = None;
3494 pns.loss_time = None;
3495 pns.loss_probes = 0;
3496 let sent_packets = mem::take(&mut pns.sent_packets);
3497 let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3498 for (_, packet) in sent_packets.into_iter() {
3499 path.data.remove_in_flight(&packet);
3500 }
3501
3502 self.set_loss_detection_timer(now, PathId::ZERO)
3503 }
3504
3505 fn handle_coalesced(
3506 &mut self,
3507 now: Instant,
3508 remote: SocketAddr,
3509 path_id: PathId,
3510 ecn: Option<EcnCodepoint>,
3511 data: BytesMut,
3512 ) {
3513 self.path_data_mut(path_id)
3514 .inc_total_recvd(data.len() as u64);
3515 let mut remaining = Some(data);
3516 let cid_len = self
3517 .local_cid_state
3518 .values()
3519 .map(|cid_state| cid_state.cid_len())
3520 .next()
3521 .expect("one cid_state must exist");
3522 while let Some(data) = remaining {
3523 match PartialDecode::new(
3524 data,
3525 &FixedLengthConnectionIdParser::new(cid_len),
3526 &[self.version],
3527 self.endpoint_config.grease_quic_bit,
3528 ) {
3529 Ok((partial_decode, rest)) => {
3530 remaining = rest;
3531 self.handle_decode(now, remote, path_id, ecn, partial_decode);
3532 }
3533 Err(e) => {
3534 trace!("malformed header: {}", e);
3535 return;
3536 }
3537 }
3538 }
3539 }
3540
3541 fn handle_decode(
3542 &mut self,
3543 now: Instant,
3544 remote: SocketAddr,
3545 path_id: PathId,
3546 ecn: Option<EcnCodepoint>,
3547 partial_decode: PartialDecode,
3548 ) {
3549 let qlog = QlogRecvPacket::new(partial_decode.len());
3550 if let Some(decoded) = packet_crypto::unprotect_header(
3551 partial_decode,
3552 &self.spaces,
3553 self.zero_rtt_crypto.as_ref(),
3554 self.peer_params.stateless_reset_token,
3555 ) {
3556 self.handle_packet(
3557 now,
3558 remote,
3559 path_id,
3560 ecn,
3561 decoded.packet,
3562 decoded.stateless_reset,
3563 qlog,
3564 );
3565 }
3566 }
3567
3568 fn handle_packet(
3569 &mut self,
3570 now: Instant,
3571 remote: SocketAddr,
3572 path_id: PathId,
3573 ecn: Option<EcnCodepoint>,
3574 packet: Option<Packet>,
3575 stateless_reset: bool,
3576 mut qlog: QlogRecvPacket,
3577 ) {
3578 self.stats.udp_rx.ios += 1;
3579 if let Some(ref packet) = packet {
3580 trace!(
3581 "got {:?} packet ({} bytes) from {} using id {}",
3582 packet.header.space(),
3583 packet.payload.len() + packet.header_data.len(),
3584 remote,
3585 packet.header.dst_cid(),
3586 );
3587 }
3588
3589 if self.is_handshaking() {
3590 if path_id != PathId::ZERO {
3591 debug!(%remote, %path_id, "discarding multipath packet during handshake");
3592 return;
3593 }
3594 if remote != self.path_data_mut(path_id).remote {
3595 if let Some(hs) = self.state.as_handshake() {
3596 if hs.allow_server_migration {
3597 trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote");
3598 self.path_data_mut(path_id).remote = remote;
3599 self.qlog.emit_tuple_assigned(path_id, remote, now);
3600 } else {
3601 debug!("discarding packet with unexpected remote during handshake");
3602 return;
3603 }
3604 } else {
3605 debug!("discarding packet with unexpected remote during handshake");
3606 return;
3607 }
3608 }
3609 }
3610
3611 let was_closed = self.state.is_closed();
3612 let was_drained = self.state.is_drained();
3613
3614 let decrypted = match packet {
3615 None => Err(None),
3616 Some(mut packet) => self
3617 .decrypt_packet(now, path_id, &mut packet)
3618 .map(move |number| (packet, number)),
3619 };
3620 let result = match decrypted {
3621 _ if stateless_reset => {
3622 debug!("got stateless reset");
3623 Err(ConnectionError::Reset)
3624 }
3625 Err(Some(e)) => {
3626 warn!("illegal packet: {}", e);
3627 Err(e.into())
3628 }
3629 Err(None) => {
3630 debug!("failed to authenticate packet");
3631 self.authentication_failures += 1;
3632 let integrity_limit = self.spaces[self.highest_space]
3633 .crypto
3634 .as_ref()
3635 .unwrap()
3636 .packet
3637 .local
3638 .integrity_limit();
3639 if self.authentication_failures > integrity_limit {
3640 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3641 } else {
3642 return;
3643 }
3644 }
3645 Ok((packet, number)) => {
3646 qlog.header(&packet.header, number, path_id);
3647 let span = match number {
3648 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3649 None => trace_span!("recv", space = ?packet.header.space()),
3650 };
3651 let _guard = span.enter();
3652
3653 let dedup = self.spaces[packet.header.space()]
3654 .path_space_mut(path_id)
3655 .map(|pns| &mut pns.dedup);
3656 if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3657 debug!("discarding possible duplicate packet");
3658 self.qlog.emit_packet_received(qlog, now);
3659 return;
3660 } else if self.state.is_handshake() && packet.header.is_short() {
3661 trace!("dropping short packet during handshake");
3663 self.qlog.emit_packet_received(qlog, now);
3664 return;
3665 } else {
3666 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3667 if let Some(hs) = self.state.as_handshake() {
3668 if self.side.is_server() && token != &hs.expected_token {
3669 warn!("discarding Initial with invalid retry token");
3673 self.qlog.emit_packet_received(qlog, now);
3674 return;
3675 }
3676 }
3677 }
3678
3679 if !self.state.is_closed() {
3680 let spin = match packet.header {
3681 Header::Short { spin, .. } => spin,
3682 _ => false,
3683 };
3684
3685 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3686 self.ensure_path(path_id, remote, now, number);
3688 }
3689 if self.paths.contains_key(&path_id) {
3690 self.on_packet_authenticated(
3691 now,
3692 packet.header.space(),
3693 path_id,
3694 ecn,
3695 number,
3696 spin,
3697 packet.header.is_1rtt(),
3698 );
3699 }
3700 }
3701
3702 let res = self
3703 .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog);
3704
3705 self.qlog.emit_packet_received(qlog, now);
3706 res
3707 }
3708 }
3709 };
3710
3711 if let Err(conn_err) = result {
3713 match conn_err {
3714 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3715 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3716 ConnectionError::Reset
3717 | ConnectionError::TransportError(TransportError {
3718 code: TransportErrorCode::AEAD_LIMIT_REACHED,
3719 ..
3720 }) => {
3721 self.state.move_to_drained(Some(conn_err));
3722 }
3723 ConnectionError::TimedOut => {
3724 unreachable!("timeouts aren't generated by packet processing");
3725 }
3726 ConnectionError::TransportError(err) => {
3727 debug!("closing connection due to transport error: {}", err);
3728 self.state.move_to_closed(err);
3729 }
3730 ConnectionError::VersionMismatch => {
3731 self.state.move_to_draining(Some(conn_err));
3732 }
3733 ConnectionError::LocallyClosed => {
3734 unreachable!("LocallyClosed isn't generated by packet processing");
3735 }
3736 ConnectionError::CidsExhausted => {
3737 unreachable!("CidsExhausted isn't generated by packet processing");
3738 }
3739 };
3740 }
3741
3742 if !was_closed && self.state.is_closed() {
3743 self.close_common();
3744 if !self.state.is_drained() {
3745 self.set_close_timer(now);
3746 }
3747 }
3748 if !was_drained && self.state.is_drained() {
3749 self.endpoint_events.push_back(EndpointEventInner::Drained);
3750 self.timers
3753 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3754 }
3755
3756 if matches!(self.state.as_type(), StateType::Closed) {
3758 let path_remote = self
3762 .paths
3763 .get(&path_id)
3764 .map(|p| p.data.remote)
3765 .unwrap_or(remote);
3766 self.close = remote == path_remote;
3767 }
3768 }
3769
3770 fn process_decrypted_packet(
3771 &mut self,
3772 now: Instant,
3773 remote: SocketAddr,
3774 path_id: PathId,
3775 number: Option<u64>,
3776 packet: Packet,
3777 qlog: &mut QlogRecvPacket,
3778 ) -> Result<(), ConnectionError> {
3779 if !self.paths.contains_key(&path_id) {
3780 trace!(%path_id, ?number, "discarding packet for unknown path");
3784 return Ok(());
3785 }
3786 let state = match self.state.as_type() {
3787 StateType::Established => {
3788 match packet.header.space() {
3789 SpaceId::Data => {
3790 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?
3791 }
3792 _ if packet.header.has_frames() => {
3793 self.process_early_payload(now, path_id, packet, qlog)?
3794 }
3795 _ => {
3796 trace!("discarding unexpected pre-handshake packet");
3797 }
3798 }
3799 return Ok(());
3800 }
3801 StateType::Closed => {
3802 for result in frame::Iter::new(packet.payload.freeze())? {
3803 let frame = match result {
3804 Ok(frame) => frame,
3805 Err(err) => {
3806 debug!("frame decoding error: {err:?}");
3807 continue;
3808 }
3809 };
3810 qlog.frame(&frame);
3811
3812 if let Frame::Padding = frame {
3813 continue;
3814 };
3815
3816 self.stats.frame_rx.record(&frame);
3817
3818 if let Frame::Close(_error) = frame {
3819 self.state.move_to_draining(None);
3820 break;
3821 }
3822 }
3823 return Ok(());
3824 }
3825 StateType::Draining | StateType::Drained => return Ok(()),
3826 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3827 };
3828
3829 match packet.header {
3830 Header::Retry {
3831 src_cid: rem_cid, ..
3832 } => {
3833 debug_assert_eq!(path_id, PathId::ZERO);
3834 if self.side.is_server() {
3835 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3836 }
3837
3838 let is_valid_retry = self
3839 .rem_cids
3840 .get(&path_id)
3841 .map(|cids| cids.active())
3842 .map(|orig_dst_cid| {
3843 self.crypto.is_valid_retry(
3844 orig_dst_cid,
3845 &packet.header_data,
3846 &packet.payload,
3847 )
3848 })
3849 .unwrap_or_default();
3850 if self.total_authed_packets > 1
3851 || packet.payload.len() <= 16 || !is_valid_retry
3853 {
3854 trace!("discarding invalid Retry");
3855 return Ok(());
3863 }
3864
3865 trace!("retrying with CID {}", rem_cid);
3866 let client_hello = state.client_hello.take().unwrap();
3867 self.retry_src_cid = Some(rem_cid);
3868 self.rem_cids
3869 .get_mut(&path_id)
3870 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3871 .update_initial_cid(rem_cid);
3872 self.rem_handshake_cid = rem_cid;
3873
3874 let space = &mut self.spaces[SpaceId::Initial];
3875 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3876 self.on_packet_acked(now, PathId::ZERO, info);
3877 };
3878
3879 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = {
3882 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3883 space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3884 space.crypto_offset = client_hello.len() as u64;
3885 space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3886 .for_path(path_id)
3887 .next_packet_number;
3888 space.pending.crypto.push_back(frame::Crypto {
3889 offset: 0,
3890 data: client_hello,
3891 });
3892 space
3893 };
3894
3895 let zero_rtt = mem::take(
3897 &mut self.spaces[SpaceId::Data]
3898 .for_path(PathId::ZERO)
3899 .sent_packets,
3900 );
3901 for (_, info) in zero_rtt.into_iter() {
3902 self.paths
3903 .get_mut(&PathId::ZERO)
3904 .unwrap()
3905 .remove_in_flight(&info);
3906 self.spaces[SpaceId::Data].pending |= info.retransmits;
3907 }
3908 self.streams.retransmit_all_for_0rtt();
3909
3910 let token_len = packet.payload.len() - 16;
3911 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3912 unreachable!("we already short-circuited if we're server");
3913 };
3914 *token = packet.payload.freeze().split_to(token_len);
3915
3916 self.state = State::handshake(state::Handshake {
3917 expected_token: Bytes::new(),
3918 rem_cid_set: false,
3919 client_hello: None,
3920 allow_server_migration: true,
3921 });
3922 Ok(())
3923 }
3924 Header::Long {
3925 ty: LongType::Handshake,
3926 src_cid: rem_cid,
3927 dst_cid: loc_cid,
3928 ..
3929 } => {
3930 debug_assert_eq!(path_id, PathId::ZERO);
3931 if rem_cid != self.rem_handshake_cid {
3932 debug!(
3933 "discarding packet with mismatched remote CID: {} != {}",
3934 self.rem_handshake_cid, rem_cid
3935 );
3936 return Ok(());
3937 }
3938 self.on_path_validated(path_id);
3939
3940 self.process_early_payload(now, path_id, packet, qlog)?;
3941 if self.state.is_closed() {
3942 return Ok(());
3943 }
3944
3945 if self.crypto.is_handshaking() {
3946 trace!("handshake ongoing");
3947 return Ok(());
3948 }
3949
3950 if self.side.is_client() {
3951 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3953 TransportError::new(
3954 TransportErrorCode::crypto(0x6d),
3955 "transport parameters missing".to_owned(),
3956 )
3957 })?;
3958
3959 if self.has_0rtt() {
3960 if !self.crypto.early_data_accepted().unwrap() {
3961 debug_assert!(self.side.is_client());
3962 debug!("0-RTT rejected");
3963 self.accepted_0rtt = false;
3964 self.streams.zero_rtt_rejected();
3965
3966 self.spaces[SpaceId::Data].pending = Retransmits::default();
3968
3969 let sent_packets = mem::take(
3971 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3972 );
3973 for (_, packet) in sent_packets.into_iter() {
3974 self.paths
3975 .get_mut(&path_id)
3976 .unwrap()
3977 .remove_in_flight(&packet);
3978 }
3979 } else {
3980 self.accepted_0rtt = true;
3981 params.validate_resumption_from(&self.peer_params)?;
3982 }
3983 }
3984 if let Some(token) = params.stateless_reset_token {
3985 let remote = self.path_data(path_id).remote;
3986 self.endpoint_events
3987 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
3988 }
3989 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
3990 self.issue_first_cids(now);
3991 } else {
3992 self.spaces[SpaceId::Data].pending.handshake_done = true;
3994 self.discard_space(now, SpaceId::Handshake);
3995 self.events.push_back(Event::HandshakeConfirmed);
3996 trace!("handshake confirmed");
3997 }
3998
3999 self.events.push_back(Event::Connected);
4000 self.state.move_to_established();
4001 trace!("established");
4002
4003 self.issue_first_path_cids(now);
4006 Ok(())
4007 }
4008 Header::Initial(InitialHeader {
4009 src_cid: rem_cid,
4010 dst_cid: loc_cid,
4011 ..
4012 }) => {
4013 debug_assert_eq!(path_id, PathId::ZERO);
4014 if !state.rem_cid_set {
4015 trace!("switching remote CID to {}", rem_cid);
4016 let mut state = state.clone();
4017 self.rem_cids
4018 .get_mut(&path_id)
4019 .expect("PathId::ZERO not yet abandoned")
4020 .update_initial_cid(rem_cid);
4021 self.rem_handshake_cid = rem_cid;
4022 self.orig_rem_cid = rem_cid;
4023 state.rem_cid_set = true;
4024 self.state.move_to_handshake(state);
4025 } else if rem_cid != self.rem_handshake_cid {
4026 debug!(
4027 "discarding packet with mismatched remote CID: {} != {}",
4028 self.rem_handshake_cid, rem_cid
4029 );
4030 return Ok(());
4031 }
4032
4033 let starting_space = self.highest_space;
4034 self.process_early_payload(now, path_id, packet, qlog)?;
4035
4036 if self.side.is_server()
4037 && starting_space == SpaceId::Initial
4038 && self.highest_space != SpaceId::Initial
4039 {
4040 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4041 TransportError::new(
4042 TransportErrorCode::crypto(0x6d),
4043 "transport parameters missing".to_owned(),
4044 )
4045 })?;
4046 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4047 self.issue_first_cids(now);
4048 self.init_0rtt(now);
4049 }
4050 Ok(())
4051 }
4052 Header::Long {
4053 ty: LongType::ZeroRtt,
4054 ..
4055 } => {
4056 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?;
4057 Ok(())
4058 }
4059 Header::VersionNegotiate { .. } => {
4060 if self.total_authed_packets > 1 {
4061 return Ok(());
4062 }
4063 let supported = packet
4064 .payload
4065 .chunks(4)
4066 .any(|x| match <[u8; 4]>::try_from(x) {
4067 Ok(version) => self.version == u32::from_be_bytes(version),
4068 Err(_) => false,
4069 });
4070 if supported {
4071 return Ok(());
4072 }
4073 debug!("remote doesn't support our version");
4074 Err(ConnectionError::VersionMismatch)
4075 }
4076 Header::Short { .. } => unreachable!(
4077 "short packets received during handshake are discarded in handle_packet"
4078 ),
4079 }
4080 }
4081
4082 fn process_early_payload(
4084 &mut self,
4085 now: Instant,
4086 path_id: PathId,
4087 packet: Packet,
4088 #[allow(unused)] qlog: &mut QlogRecvPacket,
4089 ) -> Result<(), TransportError> {
4090 debug_assert_ne!(packet.header.space(), SpaceId::Data);
4091 debug_assert_eq!(path_id, PathId::ZERO);
4092 let payload_len = packet.payload.len();
4093 let mut ack_eliciting = false;
4094 for result in frame::Iter::new(packet.payload.freeze())? {
4095 let frame = result?;
4096 qlog.frame(&frame);
4097 let span = match frame {
4098 Frame::Padding => continue,
4099 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4100 };
4101
4102 self.stats.frame_rx.record(&frame);
4103
4104 let _guard = span.as_ref().map(|x| x.enter());
4105 ack_eliciting |= frame.is_ack_eliciting();
4106
4107 if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4109 return Err(TransportError::PROTOCOL_VIOLATION(
4110 "illegal frame type in handshake",
4111 ));
4112 }
4113
4114 match frame {
4115 Frame::Padding | Frame::Ping => {}
4116 Frame::Crypto(frame) => {
4117 self.read_crypto(packet.header.space(), &frame, payload_len)?;
4118 }
4119 Frame::Ack(ack) => {
4120 self.on_ack_received(now, packet.header.space(), ack)?;
4121 }
4122 Frame::PathAck(ack) => {
4123 span.as_ref()
4124 .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4125 self.on_path_ack_received(now, packet.header.space(), ack)?;
4126 }
4127 Frame::Close(reason) => {
4128 self.state.move_to_draining(Some(reason.into()));
4129 return Ok(());
4130 }
4131 _ => {
4132 let mut err =
4133 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4134 err.frame = Some(frame.ty());
4135 return Err(err);
4136 }
4137 }
4138 }
4139
4140 if ack_eliciting {
4141 self.spaces[packet.header.space()]
4143 .for_path(path_id)
4144 .pending_acks
4145 .set_immediate_ack_required();
4146 }
4147
4148 self.write_crypto();
4149 Ok(())
4150 }
4151
4152 fn process_payload(
4154 &mut self,
4155 now: Instant,
4156 remote: SocketAddr,
4157 path_id: PathId,
4158 number: u64,
4159 packet: Packet,
4160 #[allow(unused)] qlog: &mut QlogRecvPacket,
4161 ) -> Result<(), TransportError> {
4162 let payload = packet.payload.freeze();
4163 let mut is_probing_packet = true;
4164 let mut close = None;
4165 let payload_len = payload.len();
4166 let mut ack_eliciting = false;
4167 let mut migration_observed_addr = None;
4170 for result in frame::Iter::new(payload)? {
4171 let frame = result?;
4172 qlog.frame(&frame);
4173 let span = match frame {
4174 Frame::Padding => continue,
4175 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4176 };
4177
4178 self.stats.frame_rx.record(&frame);
4179 match &frame {
4182 Frame::Crypto(f) => {
4183 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4184 }
4185 Frame::Stream(f) => {
4186 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4187 }
4188 Frame::Datagram(f) => {
4189 trace!(len = f.data.len(), "got datagram frame");
4190 }
4191 f => {
4192 trace!("got frame {f}");
4193 }
4194 }
4195
4196 let _guard = span.enter();
4197 if packet.header.is_0rtt() {
4198 match frame {
4199 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4200 return Err(TransportError::PROTOCOL_VIOLATION(
4201 "illegal frame type in 0-RTT",
4202 ));
4203 }
4204 _ => {
4205 if frame.is_1rtt() {
4206 return Err(TransportError::PROTOCOL_VIOLATION(
4207 "illegal frame type in 0-RTT",
4208 ));
4209 }
4210 }
4211 }
4212 }
4213 ack_eliciting |= frame.is_ack_eliciting();
4214
4215 match frame {
4217 Frame::Padding
4218 | Frame::PathChallenge(_)
4219 | Frame::PathResponse(_)
4220 | Frame::NewConnectionId(_)
4221 | Frame::ObservedAddr(_) => {}
4222 _ => {
4223 is_probing_packet = false;
4224 }
4225 }
4226
4227 match frame {
4228 Frame::Crypto(frame) => {
4229 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4230 }
4231 Frame::Stream(frame) => {
4232 if self.streams.received(frame, payload_len)?.should_transmit() {
4233 self.spaces[SpaceId::Data].pending.max_data = true;
4234 }
4235 }
4236 Frame::Ack(ack) => {
4237 self.on_ack_received(now, SpaceId::Data, ack)?;
4238 }
4239 Frame::PathAck(ack) => {
4240 span.record("path", tracing::field::debug(&ack.path_id));
4241 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4242 }
4243 Frame::Padding | Frame::Ping => {}
4244 Frame::Close(reason) => {
4245 close = Some(reason);
4246 }
4247 Frame::PathChallenge(challenge) => {
4248 let path = &mut self
4249 .path_mut(path_id)
4250 .expect("payload is processed only after the path becomes known");
4251 path.path_responses.push(number, challenge.0, remote);
4252 if remote == path.remote {
4253 match self.peer_supports_ack_frequency() {
4263 true => self.immediate_ack(path_id),
4264 false => {
4265 self.ping_path(path_id).ok();
4266 }
4267 }
4268 }
4269 }
4270 Frame::PathResponse(response) => {
4271 let path = self
4272 .paths
4273 .get_mut(&path_id)
4274 .expect("payload is processed only after the path becomes known");
4275
4276 use PathTimer::*;
4277 use paths::OnPathResponseReceived::*;
4278 match path.data.on_path_response_received(now, response.0, remote) {
4279 OnPath { was_open } => {
4280 let qlog = self.qlog.with_time(now);
4281
4282 self.timers
4283 .stop(Timer::PerPath(path_id, PathValidation), qlog.clone());
4284 self.timers
4285 .stop(Timer::PerPath(path_id, PathOpen), qlog.clone());
4286
4287 let next_challenge = path
4288 .data
4289 .earliest_expiring_challenge()
4290 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4291 self.timers.set_or_stop(
4292 Timer::PerPath(path_id, PathChallengeLost),
4293 next_challenge,
4294 qlog,
4295 );
4296
4297 if !was_open {
4298 self.events
4299 .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4300 if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4301 {
4302 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4303 id: path_id,
4304 addr: observed.socket_addr(),
4305 }));
4306 }
4307 }
4308 if let Some((_, ref mut prev)) = path.prev {
4309 prev.challenges_sent.clear();
4310 prev.send_new_challenge = false;
4311 }
4312 }
4313 OffPath => {
4314 debug!("Response to off-path PathChallenge!");
4315 let next_challenge = path
4316 .data
4317 .earliest_expiring_challenge()
4318 .map(|time| time + self.ack_frequency.max_ack_delay_for_pto());
4319 self.timers.set_or_stop(
4320 Timer::PerPath(path_id, PathChallengeLost),
4321 next_challenge,
4322 self.qlog.with_time(now),
4323 );
4324 }
4325 Invalid { expected } => {
4326 debug!(%response, from=%remote, %expected, "ignoring invalid PATH_RESPONSE")
4327 }
4328 Unknown => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4329 }
4330 }
4331 Frame::MaxData(bytes) => {
4332 self.streams.received_max_data(bytes);
4333 }
4334 Frame::MaxStreamData { id, offset } => {
4335 self.streams.received_max_stream_data(id, offset)?;
4336 }
4337 Frame::MaxStreams { dir, count } => {
4338 self.streams.received_max_streams(dir, count)?;
4339 }
4340 Frame::ResetStream(frame) => {
4341 if self.streams.received_reset(frame)?.should_transmit() {
4342 self.spaces[SpaceId::Data].pending.max_data = true;
4343 }
4344 }
4345 Frame::DataBlocked { offset } => {
4346 debug!(offset, "peer claims to be blocked at connection level");
4347 }
4348 Frame::StreamDataBlocked { id, offset } => {
4349 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4350 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4351 return Err(TransportError::STREAM_STATE_ERROR(
4352 "STREAM_DATA_BLOCKED on send-only stream",
4353 ));
4354 }
4355 debug!(
4356 stream = %id,
4357 offset, "peer claims to be blocked at stream level"
4358 );
4359 }
4360 Frame::StreamsBlocked { dir, limit } => {
4361 if limit > MAX_STREAM_COUNT {
4362 return Err(TransportError::FRAME_ENCODING_ERROR(
4363 "unrepresentable stream limit",
4364 ));
4365 }
4366 debug!(
4367 "peer claims to be blocked opening more than {} {} streams",
4368 limit, dir
4369 );
4370 }
4371 Frame::StopSending(frame::StopSending { id, error_code }) => {
4372 if id.initiator() != self.side.side() {
4373 if id.dir() == Dir::Uni {
4374 debug!("got STOP_SENDING on recv-only {}", id);
4375 return Err(TransportError::STREAM_STATE_ERROR(
4376 "STOP_SENDING on recv-only stream",
4377 ));
4378 }
4379 } else if self.streams.is_local_unopened(id) {
4380 return Err(TransportError::STREAM_STATE_ERROR(
4381 "STOP_SENDING on unopened stream",
4382 ));
4383 }
4384 self.streams.received_stop_sending(id, error_code);
4385 }
4386 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4387 if let Some(ref path_id) = path_id {
4388 span.record("path", tracing::field::debug(&path_id));
4389 }
4390 let path_id = path_id.unwrap_or_default();
4391 match self.local_cid_state.get_mut(&path_id) {
4392 None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4393 Some(cid_state) => {
4394 let allow_more_cids = cid_state
4395 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4396
4397 let has_path = !self.abandoned_paths.contains(&path_id);
4401 let allow_more_cids = allow_more_cids && has_path;
4402
4403 self.endpoint_events
4404 .push_back(EndpointEventInner::RetireConnectionId(
4405 now,
4406 path_id,
4407 sequence,
4408 allow_more_cids,
4409 ));
4410 }
4411 }
4412 }
4413 Frame::NewConnectionId(frame) => {
4414 let path_id = if let Some(path_id) = frame.path_id {
4415 if !self.is_multipath_negotiated() {
4416 return Err(TransportError::PROTOCOL_VIOLATION(
4417 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4418 ));
4419 }
4420 if path_id > self.local_max_path_id {
4421 return Err(TransportError::PROTOCOL_VIOLATION(
4422 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4423 ));
4424 }
4425 path_id
4426 } else {
4427 PathId::ZERO
4428 };
4429
4430 if self.abandoned_paths.contains(&path_id) {
4431 trace!("ignoring issued CID for abandoned path");
4432 continue;
4433 }
4434 if let Some(ref path_id) = frame.path_id {
4435 span.record("path", tracing::field::debug(&path_id));
4436 }
4437 let rem_cids = self
4438 .rem_cids
4439 .entry(path_id)
4440 .or_insert_with(|| CidQueue::new(frame.id));
4441 if rem_cids.active().is_empty() {
4442 return Err(TransportError::PROTOCOL_VIOLATION(
4443 "NEW_CONNECTION_ID when CIDs aren't in use",
4444 ));
4445 }
4446 if frame.retire_prior_to > frame.sequence {
4447 return Err(TransportError::PROTOCOL_VIOLATION(
4448 "NEW_CONNECTION_ID retiring unissued CIDs",
4449 ));
4450 }
4451
4452 use crate::cid_queue::InsertError;
4453 match rem_cids.insert(frame) {
4454 Ok(None) if self.path(path_id).is_none() => {
4455 self.continue_nat_traversal_round(now);
4458 }
4459 Ok(None) => {}
4460 Ok(Some((retired, reset_token))) => {
4461 let pending_retired =
4462 &mut self.spaces[SpaceId::Data].pending.retire_cids;
4463 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4466 if (pending_retired.len() as u64)
4469 .saturating_add(retired.end.saturating_sub(retired.start))
4470 > MAX_PENDING_RETIRED_CIDS
4471 {
4472 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4473 "queued too many retired CIDs",
4474 ));
4475 }
4476 pending_retired.extend(retired.map(|seq| (path_id, seq)));
4477 self.set_reset_token(path_id, remote, reset_token);
4478 }
4479 Err(InsertError::ExceedsLimit) => {
4480 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4481 }
4482 Err(InsertError::Retired) => {
4483 trace!("discarding already-retired");
4484 self.spaces[SpaceId::Data]
4488 .pending
4489 .retire_cids
4490 .push((path_id, frame.sequence));
4491 continue;
4492 }
4493 };
4494
4495 if self.side.is_server()
4496 && path_id == PathId::ZERO
4497 && self
4498 .rem_cids
4499 .get(&PathId::ZERO)
4500 .map(|cids| cids.active_seq() == 0)
4501 .unwrap_or_default()
4502 {
4503 self.update_rem_cid(PathId::ZERO);
4506 }
4507 }
4508 Frame::NewToken(NewToken { token }) => {
4509 let ConnectionSide::Client {
4510 token_store,
4511 server_name,
4512 ..
4513 } = &self.side
4514 else {
4515 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4516 };
4517 if token.is_empty() {
4518 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4519 }
4520 trace!("got new token");
4521 token_store.insert(server_name, token);
4522 }
4523 Frame::Datagram(datagram) => {
4524 if self
4525 .datagrams
4526 .received(datagram, &self.config.datagram_receive_buffer_size)?
4527 {
4528 self.events.push_back(Event::DatagramReceived);
4529 }
4530 }
4531 Frame::AckFrequency(ack_frequency) => {
4532 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4535 continue;
4538 }
4539
4540 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4542 space.pending_acks.set_ack_frequency_params(&ack_frequency);
4543
4544 if let Some(timeout) = space
4547 .pending_acks
4548 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4549 {
4550 self.timers.set(
4551 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4552 timeout,
4553 self.qlog.with_time(now),
4554 );
4555 }
4556 }
4557 }
4558 Frame::ImmediateAck => {
4559 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4561 pns.pending_acks.set_immediate_ack_required();
4562 }
4563 }
4564 Frame::HandshakeDone => {
4565 if self.side.is_server() {
4566 return Err(TransportError::PROTOCOL_VIOLATION(
4567 "client sent HANDSHAKE_DONE",
4568 ));
4569 }
4570 if self.spaces[SpaceId::Handshake].crypto.is_some() {
4571 self.discard_space(now, SpaceId::Handshake);
4572 }
4573 self.events.push_back(Event::HandshakeConfirmed);
4574 trace!("handshake confirmed");
4575 }
4576 Frame::ObservedAddr(observed) => {
4577 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4579 if !self
4580 .peer_params
4581 .address_discovery_role
4582 .should_report(&self.config.address_discovery_role)
4583 {
4584 return Err(TransportError::PROTOCOL_VIOLATION(
4585 "received OBSERVED_ADDRESS frame when not negotiated",
4586 ));
4587 }
4588 if packet.header.space() != SpaceId::Data {
4590 return Err(TransportError::PROTOCOL_VIOLATION(
4591 "OBSERVED_ADDRESS frame outside data space",
4592 ));
4593 }
4594
4595 let path = self.path_data_mut(path_id);
4596 if remote == path.remote {
4597 if let Some(updated) = path.update_observed_addr_report(observed) {
4598 if path.open {
4599 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4600 id: path_id,
4601 addr: updated,
4602 }));
4603 }
4604 }
4606 } else {
4607 migration_observed_addr = Some(observed)
4609 }
4610 }
4611 Frame::PathAbandon(frame::PathAbandon {
4612 path_id,
4613 error_code,
4614 }) => {
4615 span.record("path", tracing::field::debug(&path_id));
4616 match self.close_path(now, path_id, error_code.into()) {
4618 Ok(()) => {
4619 trace!("peer abandoned path");
4620 }
4621 Err(ClosePathError::LastOpenPath) => {
4622 trace!("peer abandoned last path, closing connection");
4623 return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4625 }
4626 Err(ClosePathError::ClosedPath) => {
4627 trace!("peer abandoned already closed path");
4628 }
4629 };
4630 if let Some(path) = self.paths.get_mut(&path_id) {
4635 if !matches!(path.data.abandon_state, AbandonState::ReceivedPathAbandon) {
4636 let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
4637 let pto = path.data.rtt.pto_base() + ack_delay;
4638 self.timers.set(
4639 Timer::PerPath(path_id, PathTimer::DiscardPath),
4640 now + 3 * pto,
4641 self.qlog.with_time(now),
4642 );
4643 path.data.abandon_state = AbandonState::ReceivedPathAbandon;
4645 }
4646 }
4647 }
4648 Frame::PathStatusAvailable(info) => {
4649 span.record("path", tracing::field::debug(&info.path_id));
4650 if self.is_multipath_negotiated() {
4651 self.on_path_status(
4652 info.path_id,
4653 PathStatus::Available,
4654 info.status_seq_no,
4655 );
4656 } else {
4657 return Err(TransportError::PROTOCOL_VIOLATION(
4658 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4659 ));
4660 }
4661 }
4662 Frame::PathStatusBackup(info) => {
4663 span.record("path", tracing::field::debug(&info.path_id));
4664 if self.is_multipath_negotiated() {
4665 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4666 } else {
4667 return Err(TransportError::PROTOCOL_VIOLATION(
4668 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4669 ));
4670 }
4671 }
4672 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4673 span.record("path", tracing::field::debug(&path_id));
4674 if !self.is_multipath_negotiated() {
4675 return Err(TransportError::PROTOCOL_VIOLATION(
4676 "received MAX_PATH_ID frame when multipath was not negotiated",
4677 ));
4678 }
4679 if path_id > self.remote_max_path_id {
4681 self.remote_max_path_id = path_id;
4682 self.issue_first_path_cids(now);
4683 while let Some(true) = self.continue_nat_traversal_round(now) {}
4684 }
4685 }
4686 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4687 if self.is_multipath_negotiated() {
4691 if self.local_max_path_id > max_path_id {
4692 return Err(TransportError::PROTOCOL_VIOLATION(
4693 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4694 ));
4695 }
4696 debug!("received PATHS_BLOCKED({:?})", max_path_id);
4697 } else {
4699 return Err(TransportError::PROTOCOL_VIOLATION(
4700 "received PATHS_BLOCKED frame when not multipath was not negotiated",
4701 ));
4702 }
4703 }
4704 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4705 if self.is_multipath_negotiated() {
4713 if path_id > self.local_max_path_id {
4714 return Err(TransportError::PROTOCOL_VIOLATION(
4715 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4716 ));
4717 }
4718 if next_seq.0
4719 > self
4720 .local_cid_state
4721 .get(&path_id)
4722 .map(|cid_state| cid_state.active_seq().1 + 1)
4723 .unwrap_or_default()
4724 {
4725 return Err(TransportError::PROTOCOL_VIOLATION(
4726 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4727 ));
4728 }
4729 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4730 } else {
4731 return Err(TransportError::PROTOCOL_VIOLATION(
4732 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4733 ));
4734 }
4735 }
4736 Frame::AddAddress(addr) => {
4737 let client_state = match self.iroh_hp.client_side_mut() {
4738 Ok(state) => state,
4739 Err(err) => {
4740 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4741 "Nat traversal(ADD_ADDRESS): {err}"
4742 )));
4743 }
4744 };
4745
4746 if !client_state.check_remote_address(&addr) {
4747 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4749 }
4750
4751 match client_state.add_remote_address(addr) {
4752 Ok(maybe_added) => {
4753 if let Some(added) = maybe_added {
4754 self.events.push_back(Event::NatTraversal(
4755 iroh_hp::Event::AddressAdded(added),
4756 ));
4757 }
4758 }
4759 Err(e) => {
4760 warn!(%e, "failed to add remote address")
4761 }
4762 }
4763 }
4764 Frame::RemoveAddress(addr) => {
4765 let client_state = match self.iroh_hp.client_side_mut() {
4766 Ok(state) => state,
4767 Err(err) => {
4768 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4769 "Nat traversal(REMOVE_ADDRESS): {err}"
4770 )));
4771 }
4772 };
4773 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4774 self.events
4775 .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4776 removed_addr,
4777 )));
4778 }
4779 }
4780 Frame::ReachOut(reach_out) => {
4781 let server_state = match self.iroh_hp.server_side_mut() {
4782 Ok(state) => state,
4783 Err(err) => {
4784 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4785 "Nat traversal(REACH_OUT): {err}"
4786 )));
4787 }
4788 };
4789
4790 if let Err(err) = server_state.handle_reach_out(reach_out) {
4791 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4792 "Nat traversal(REACH_OUT): {err}"
4793 )));
4794 }
4795 }
4796 }
4797 }
4798
4799 let space = self.spaces[SpaceId::Data].for_path(path_id);
4800 if space
4801 .pending_acks
4802 .packet_received(now, number, ack_eliciting, &space.dedup)
4803 {
4804 if self.abandoned_paths.contains(&path_id) {
4805 space.pending_acks.set_immediate_ack_required();
4808 } else {
4809 self.timers.set(
4810 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4811 now + self.ack_frequency.max_ack_delay,
4812 self.qlog.with_time(now),
4813 );
4814 }
4815 }
4816
4817 let pending = &mut self.spaces[SpaceId::Data].pending;
4822 self.streams.queue_max_stream_id(pending);
4823
4824 if let Some(reason) = close {
4825 self.state.move_to_draining(Some(reason.into()));
4826 self.close = true;
4827 }
4828
4829 if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4830 && !is_probing_packet
4831 && remote != self.path_data(path_id).remote
4832 {
4833 let ConnectionSide::Server { ref server_config } = self.side else {
4834 panic!("packets from unknown remote should be dropped by clients");
4835 };
4836 debug_assert!(
4837 server_config.migration,
4838 "migration-initiating packets should have been dropped immediately"
4839 );
4840 self.migrate(path_id, now, remote, migration_observed_addr);
4841 self.update_rem_cid(path_id);
4843 self.spin = false;
4844 }
4845
4846 Ok(())
4847 }
4848
4849 fn migrate(
4850 &mut self,
4851 path_id: PathId,
4852 now: Instant,
4853 remote: SocketAddr,
4854 observed_addr: Option<ObservedAddr>,
4855 ) {
4856 trace!(%remote, %path_id, "migration initiated");
4857 self.path_counter = self.path_counter.wrapping_add(1);
4858 let prev_pto = self.pto(SpaceId::Data, path_id);
4865 let known_path = self.paths.get_mut(&path_id).expect("known path");
4866 let path = &mut known_path.data;
4867 let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() {
4868 PathData::from_previous(remote, path, self.path_counter, now)
4869 } else {
4870 let peer_max_udp_payload_size =
4871 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4872 .unwrap_or(u16::MAX);
4873 PathData::new(
4874 remote,
4875 self.allow_mtud,
4876 Some(peer_max_udp_payload_size),
4877 self.path_counter,
4878 now,
4879 &self.config,
4880 )
4881 };
4882 new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4883 if let Some(report) = observed_addr {
4884 if let Some(updated) = new_path.update_observed_addr_report(report) {
4885 tracing::info!("adding observed addr event from migration");
4886 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4887 id: path_id,
4888 addr: updated,
4889 }));
4890 }
4891 }
4892 new_path.send_new_challenge = true;
4893
4894 let mut prev = mem::replace(path, new_path);
4895 if !prev.is_validating_path() {
4897 prev.send_new_challenge = true;
4898 known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4902 }
4903
4904 self.qlog.emit_tuple_assigned(path_id, remote, now);
4906
4907 self.timers.set(
4908 Timer::PerPath(path_id, PathTimer::PathValidation),
4909 now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4910 self.qlog.with_time(now),
4911 );
4912 }
4913
4914 pub fn local_address_changed(&mut self) {
4916 self.update_rem_cid(PathId::ZERO);
4918 self.ping();
4919 }
4920
4921 fn update_rem_cid(&mut self, path_id: PathId) {
4923 let Some((reset_token, retired)) =
4924 self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4925 else {
4926 return;
4927 };
4928
4929 self.spaces[SpaceId::Data]
4931 .pending
4932 .retire_cids
4933 .extend(retired.map(|seq| (path_id, seq)));
4934 let remote = self.path_data(path_id).remote;
4935 self.set_reset_token(path_id, remote, reset_token);
4936 }
4937
4938 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4947 self.endpoint_events
4948 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4949
4950 if path_id == PathId::ZERO {
4956 self.peer_params.stateless_reset_token = Some(reset_token);
4957 }
4958 }
4959
4960 fn issue_first_cids(&mut self, now: Instant) {
4962 if self
4963 .local_cid_state
4964 .get(&PathId::ZERO)
4965 .expect("PathId::ZERO exists when the connection is created")
4966 .cid_len()
4967 == 0
4968 {
4969 return;
4970 }
4971
4972 let mut n = self.peer_params.issue_cids_limit() - 1;
4974 if let ConnectionSide::Server { server_config } = &self.side {
4975 if server_config.has_preferred_address() {
4976 n -= 1;
4978 }
4979 }
4980 self.endpoint_events
4981 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
4982 }
4983
4984 fn issue_first_path_cids(&mut self, now: Instant) {
4988 if let Some(max_path_id) = self.max_path_id() {
4989 let mut path_id = self.max_path_id_with_cids.next();
4990 while path_id <= max_path_id {
4991 self.endpoint_events
4992 .push_back(EndpointEventInner::NeedIdentifiers(
4993 path_id,
4994 now,
4995 self.peer_params.issue_cids_limit(),
4996 ));
4997 path_id = path_id.next();
4998 }
4999 self.max_path_id_with_cids = max_path_id;
5000 }
5001 }
5002
5003 fn populate_packet(
5011 &mut self,
5012 now: Instant,
5013 space_id: SpaceId,
5014 path_id: PathId,
5015 path_exclusive_only: bool,
5016 buf: &mut impl BufMut,
5017 pn: u64,
5018 #[allow(unused)] qlog: &mut QlogSentPacket,
5019 ) -> SentFrames {
5020 let mut sent = SentFrames::default();
5021 let is_multipath_negotiated = self.is_multipath_negotiated();
5022 let space = &mut self.spaces[space_id];
5023 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5024 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5025 space
5026 .for_path(path_id)
5027 .pending_acks
5028 .maybe_ack_non_eliciting();
5029
5030 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5032 trace!("HANDSHAKE_DONE");
5033 buf.write(frame::FrameType::HANDSHAKE_DONE);
5034 qlog.frame(&Frame::HandshakeDone);
5035 sent.retransmits.get_or_create().handshake_done = true;
5036 self.stats.frame_tx.handshake_done =
5038 self.stats.frame_tx.handshake_done.saturating_add(1);
5039 }
5040
5041 if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5044 while let Some(local_addr) = addresses.pop() {
5045 let reach_out = frame::ReachOut::new(*round, local_addr);
5046 if buf.remaining_mut() > reach_out.size() {
5047 trace!(%round, ?local_addr, "REACH_OUT");
5048 reach_out.write(buf);
5049 let sent_reachouts = sent
5050 .retransmits
5051 .get_or_create()
5052 .reach_out
5053 .get_or_insert_with(|| (*round, Default::default()));
5054 sent_reachouts.1.push(local_addr);
5055 self.stats.frame_tx.reach_out = self.stats.frame_tx.reach_out.saturating_add(1);
5056 qlog.frame(&Frame::ReachOut(reach_out));
5057 } else {
5058 addresses.push(local_addr);
5059 break;
5060 }
5061 }
5062 if addresses.is_empty() {
5063 space.pending.reach_out = None;
5064 }
5065 }
5066
5067 if !path_exclusive_only
5069 && space_id == SpaceId::Data
5070 && self
5071 .config
5072 .address_discovery_role
5073 .should_report(&self.peer_params.address_discovery_role)
5074 && (!path.observed_addr_sent || space.pending.observed_addr)
5075 {
5076 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5077 if buf.remaining_mut() > frame.size() {
5078 trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5079 frame.write(buf);
5080
5081 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5082 path.observed_addr_sent = true;
5083
5084 self.stats.frame_tx.observed_addr += 1;
5085 sent.retransmits.get_or_create().observed_addr = true;
5086 space.pending.observed_addr = false;
5087 qlog.frame(&Frame::ObservedAddr(frame));
5088 }
5089 }
5090
5091 if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5093 trace!("PING");
5094 buf.write(frame::FrameType::PING);
5095 sent.non_retransmits = true;
5096 self.stats.frame_tx.ping += 1;
5097 qlog.frame(&Frame::Ping);
5098 }
5099
5100 if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5102 debug_assert_eq!(
5103 space_id,
5104 SpaceId::Data,
5105 "immediate acks must be sent in the data space"
5106 );
5107 trace!("IMMEDIATE_ACK");
5108 buf.write(frame::FrameType::IMMEDIATE_ACK);
5109 sent.non_retransmits = true;
5110 self.stats.frame_tx.immediate_ack += 1;
5111 qlog.frame(&Frame::ImmediateAck);
5112 }
5113
5114 if !path_exclusive_only {
5118 for path_id in space
5119 .number_spaces
5120 .iter_mut()
5121 .filter(|(_, pns)| pns.pending_acks.can_send())
5122 .map(|(&path_id, _)| path_id)
5123 .collect::<Vec<_>>()
5124 {
5125 Self::populate_acks(
5126 now,
5127 self.receiving_ecn,
5128 &mut sent,
5129 path_id,
5130 space_id,
5131 space,
5132 is_multipath_negotiated,
5133 buf,
5134 &mut self.stats,
5135 qlog,
5136 );
5137 }
5138 }
5139
5140 if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5142 let sequence_number = self.ack_frequency.next_sequence_number();
5143
5144 let config = self.config.ack_frequency_config.as_ref().unwrap();
5146
5147 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5149 path.rtt.get(),
5150 config,
5151 &self.peer_params,
5152 );
5153
5154 trace!(?max_ack_delay, "ACK_FREQUENCY");
5155
5156 let frame = frame::AckFrequency {
5157 sequence: sequence_number,
5158 ack_eliciting_threshold: config.ack_eliciting_threshold,
5159 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5160 reordering_threshold: config.reordering_threshold,
5161 };
5162 frame.encode(buf);
5163 qlog.frame(&Frame::AckFrequency(frame));
5164
5165 sent.retransmits.get_or_create().ack_frequency = true;
5166
5167 self.ack_frequency
5168 .ack_frequency_sent(path_id, pn, max_ack_delay);
5169 self.stats.frame_tx.ack_frequency += 1;
5170 }
5171
5172 if buf.remaining_mut() > frame::PathChallenge::SIZE_BOUND
5174 && space_id == SpaceId::Data
5175 && path.send_new_challenge
5176 && !self.state.is_closed()
5177 {
5179 path.send_new_challenge = false;
5180
5181 let token = self.rng.random();
5183 let info = paths::SentChallengeInfo {
5184 sent_instant: now,
5185 remote: path.remote,
5186 };
5187 path.challenges_sent.insert(token, info);
5188 sent.non_retransmits = true;
5189 sent.requires_padding = true;
5190 let challenge = frame::PathChallenge(token);
5191 trace!(frame = %challenge);
5192 buf.write(challenge);
5193 qlog.frame(&Frame::PathChallenge(challenge));
5194 self.stats.frame_tx.path_challenge += 1;
5195 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5196 self.timers.set(
5197 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5198 now + pto,
5199 self.qlog.with_time(now),
5200 );
5201
5202 if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5203 space.pending.path_status.insert(path_id);
5205 }
5206
5207 if space_id == SpaceId::Data
5210 && self
5211 .config
5212 .address_discovery_role
5213 .should_report(&self.peer_params.address_discovery_role)
5214 {
5215 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5216 if buf.remaining_mut() > frame.size() {
5217 frame.write(buf);
5218 qlog.frame(&Frame::ObservedAddr(frame));
5219
5220 self.next_observed_addr_seq_no =
5221 self.next_observed_addr_seq_no.saturating_add(1u8);
5222 path.observed_addr_sent = true;
5223
5224 self.stats.frame_tx.observed_addr += 1;
5225 sent.retransmits.get_or_create().observed_addr = true;
5226 space.pending.observed_addr = false;
5227 }
5228 }
5229 }
5230
5231 if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data {
5233 if let Some(token) = path.path_responses.pop_on_path(path.remote) {
5234 sent.non_retransmits = true;
5235 sent.requires_padding = true;
5236 let response = frame::PathResponse(token);
5237 trace!(frame = %response);
5238 buf.write(response);
5239 qlog.frame(&Frame::PathResponse(response));
5240 self.stats.frame_tx.path_response += 1;
5241
5242 if space_id == SpaceId::Data
5246 && self
5247 .config
5248 .address_discovery_role
5249 .should_report(&self.peer_params.address_discovery_role)
5250 {
5251 let frame =
5252 frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5253 if buf.remaining_mut() > frame.size() {
5254 frame.write(buf);
5255 qlog.frame(&Frame::ObservedAddr(frame));
5256
5257 self.next_observed_addr_seq_no =
5258 self.next_observed_addr_seq_no.saturating_add(1u8);
5259 path.observed_addr_sent = true;
5260
5261 self.stats.frame_tx.observed_addr += 1;
5262 sent.retransmits.get_or_create().observed_addr = true;
5263 space.pending.observed_addr = false;
5264 }
5265 }
5266 }
5267 }
5268
5269 while !path_exclusive_only && buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt {
5271 let mut frame = match space.pending.crypto.pop_front() {
5272 Some(x) => x,
5273 None => break,
5274 };
5275
5276 let max_crypto_data_size = buf.remaining_mut()
5281 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5283 - 2; let len = frame
5286 .data
5287 .len()
5288 .min(2usize.pow(14) - 1)
5289 .min(max_crypto_data_size);
5290
5291 let data = frame.data.split_to(len);
5292 let truncated = frame::Crypto {
5293 offset: frame.offset,
5294 data,
5295 };
5296 trace!(
5297 "CRYPTO: off {} len {}",
5298 truncated.offset,
5299 truncated.data.len()
5300 );
5301 truncated.encode(buf);
5302 self.stats.frame_tx.crypto += 1;
5303
5304 #[cfg(feature = "qlog")]
5306 qlog.frame(&Frame::Crypto(truncated.clone()));
5307 sent.retransmits.get_or_create().crypto.push_back(truncated);
5308 if !frame.data.is_empty() {
5309 frame.offset += len as u64;
5310 space.pending.crypto.push_front(frame);
5311 }
5312 }
5313
5314 while !path_exclusive_only
5317 && space_id == SpaceId::Data
5318 && frame::PathAbandon::SIZE_BOUND <= buf.remaining_mut()
5319 {
5320 let Some((abandoned_path_id, error_code)) = space.pending.path_abandon.pop_first()
5321 else {
5322 break;
5323 };
5324 let frame = frame::PathAbandon {
5325 path_id: abandoned_path_id,
5326 error_code,
5327 };
5328 frame.encode(buf);
5329 qlog.frame(&Frame::PathAbandon(frame));
5330 self.stats.frame_tx.path_abandon += 1;
5331 trace!(%abandoned_path_id, "PATH_ABANDON");
5332 sent.retransmits
5333 .get_or_create()
5334 .path_abandon
5335 .entry(abandoned_path_id)
5336 .or_insert(error_code);
5337
5338 let ack_delay = self.ack_frequency.max_ack_delay_for_pto();
5339 let send_pto = self.paths.get(&path_id).unwrap().data.rtt.pto_base() + ack_delay;
5341 if let Some(abandoned_path) = self.paths.get_mut(&abandoned_path_id) {
5342 if matches!(
5345 abandoned_path.data.abandon_state,
5346 AbandonState::NotAbandoned
5347 ) {
5348 abandoned_path.data.abandon_state = AbandonState::ExpectingPathAbandon {
5354 deadline: now + 3 * send_pto,
5355 };
5356
5357 let abandoned_pto =
5367 self.paths.get(&path_id).unwrap().data.rtt.pto_base() + ack_delay;
5368 self.timers.set(
5369 Timer::PerPath(abandoned_path_id, PathTimer::DiscardPath),
5370 now + 3 * send_pto + 3 * abandoned_pto,
5371 self.qlog.with_time(now),
5372 );
5373 }
5374 } else {
5375 warn!("sent PATH_ABANDON after path was already discarded");
5376 }
5377 }
5378
5379 while !path_exclusive_only
5381 && space_id == SpaceId::Data
5382 && frame::PathStatusAvailable::SIZE_BOUND <= buf.remaining_mut()
5383 {
5384 let Some(path_id) = space.pending.path_status.pop_first() else {
5385 break;
5386 };
5387 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5388 trace!(%path_id, "discarding queued path status for unknown path");
5389 continue;
5390 };
5391
5392 let seq = path.status.seq();
5393 sent.retransmits.get_or_create().path_status.insert(path_id);
5394 match path.local_status() {
5395 PathStatus::Available => {
5396 let frame = frame::PathStatusAvailable {
5397 path_id,
5398 status_seq_no: seq,
5399 };
5400 frame.encode(buf);
5401 qlog.frame(&Frame::PathStatusAvailable(frame));
5402 self.stats.frame_tx.path_status_available += 1;
5403 trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5404 }
5405 PathStatus::Backup => {
5406 let frame = frame::PathStatusBackup {
5407 path_id,
5408 status_seq_no: seq,
5409 };
5410 frame.encode(buf);
5411 qlog.frame(&Frame::PathStatusBackup(frame));
5412 self.stats.frame_tx.path_status_backup += 1;
5413 trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5414 }
5415 }
5416 }
5417
5418 if space_id == SpaceId::Data
5420 && space.pending.max_path_id
5421 && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut()
5422 {
5423 let frame = frame::MaxPathId(self.local_max_path_id);
5424 frame.encode(buf);
5425 qlog.frame(&Frame::MaxPathId(frame));
5426 space.pending.max_path_id = false;
5427 sent.retransmits.get_or_create().max_path_id = true;
5428 trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5429 self.stats.frame_tx.max_path_id += 1;
5430 }
5431
5432 if space_id == SpaceId::Data
5434 && space.pending.paths_blocked
5435 && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut()
5436 {
5437 let frame = frame::PathsBlocked(self.remote_max_path_id);
5438 frame.encode(buf);
5439 qlog.frame(&Frame::PathsBlocked(frame));
5440 space.pending.paths_blocked = false;
5441 sent.retransmits.get_or_create().paths_blocked = true;
5442 trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED");
5443 self.stats.frame_tx.paths_blocked += 1;
5444 }
5445
5446 while space_id == SpaceId::Data && frame::PathCidsBlocked::SIZE_BOUND <= buf.remaining_mut()
5448 {
5449 let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5450 break;
5451 };
5452 let next_seq = match self.rem_cids.get(&path_id) {
5453 Some(cid_queue) => cid_queue.active_seq() + 1,
5454 None => 0,
5455 };
5456 let frame = frame::PathCidsBlocked {
5457 path_id,
5458 next_seq: VarInt(next_seq),
5459 };
5460 frame.encode(buf);
5461 qlog.frame(&Frame::PathCidsBlocked(frame));
5462 sent.retransmits
5463 .get_or_create()
5464 .path_cids_blocked
5465 .push(path_id);
5466 trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5467 self.stats.frame_tx.path_cids_blocked += 1;
5468 }
5469
5470 if space_id == SpaceId::Data {
5472 self.streams.write_control_frames(
5473 buf,
5474 &mut space.pending,
5475 &mut sent.retransmits,
5476 &mut self.stats.frame_tx,
5477 qlog,
5478 );
5479 }
5480
5481 let cid_len = self
5483 .local_cid_state
5484 .values()
5485 .map(|cid_state| cid_state.cid_len())
5486 .max()
5487 .expect("some local CID state must exist");
5488 let new_cid_size_bound =
5489 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5490 while !path_exclusive_only && buf.remaining_mut() > new_cid_size_bound {
5491 let issued = match space.pending.new_cids.pop() {
5492 Some(x) => x,
5493 None => break,
5494 };
5495 let retire_prior_to = self
5496 .local_cid_state
5497 .get(&issued.path_id)
5498 .map(|cid_state| cid_state.retire_prior_to())
5499 .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5500
5501 let cid_path_id = match is_multipath_negotiated {
5502 true => {
5503 trace!(
5504 path_id = ?issued.path_id,
5505 sequence = issued.sequence,
5506 id = %issued.id,
5507 "PATH_NEW_CONNECTION_ID",
5508 );
5509 self.stats.frame_tx.path_new_connection_id += 1;
5510 Some(issued.path_id)
5511 }
5512 false => {
5513 trace!(
5514 sequence = issued.sequence,
5515 id = %issued.id,
5516 "NEW_CONNECTION_ID"
5517 );
5518 debug_assert_eq!(issued.path_id, PathId::ZERO);
5519 self.stats.frame_tx.new_connection_id += 1;
5520 None
5521 }
5522 };
5523 let frame = frame::NewConnectionId {
5524 path_id: cid_path_id,
5525 sequence: issued.sequence,
5526 retire_prior_to,
5527 id: issued.id,
5528 reset_token: issued.reset_token,
5529 };
5530 frame.encode(buf);
5531 sent.retransmits.get_or_create().new_cids.push(issued);
5532 qlog.frame(&Frame::NewConnectionId(frame));
5533 }
5534
5535 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5537 while !path_exclusive_only && buf.remaining_mut() > retire_cid_bound {
5538 let (path_id, sequence) = match space.pending.retire_cids.pop() {
5539 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5540 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5541 self.stats.frame_tx.retire_connection_id += 1;
5542 (None, seq)
5543 }
5544 Some((path_id, seq)) => {
5545 trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5546 self.stats.frame_tx.path_retire_connection_id += 1;
5547 (Some(path_id), seq)
5548 }
5549 None => break,
5550 };
5551 let frame = frame::RetireConnectionId { path_id, sequence };
5552 frame.encode(buf);
5553 qlog.frame(&Frame::RetireConnectionId(frame));
5554 sent.retransmits
5555 .get_or_create()
5556 .retire_cids
5557 .push((path_id.unwrap_or_default(), sequence));
5558 }
5559
5560 let mut sent_datagrams = false;
5562 while !path_exclusive_only
5563 && buf.remaining_mut() > Datagram::SIZE_BOUND
5564 && space_id == SpaceId::Data
5565 {
5566 let prev_remaining = buf.remaining_mut();
5567 match self.datagrams.write(buf) {
5568 true => {
5569 sent_datagrams = true;
5570 sent.non_retransmits = true;
5571 self.stats.frame_tx.datagram += 1;
5572 qlog.frame_datagram((prev_remaining - buf.remaining_mut()) as u64);
5573 }
5574 false => break,
5575 }
5576 }
5577 if self.datagrams.send_blocked && sent_datagrams {
5578 self.events.push_back(Event::DatagramsUnblocked);
5579 self.datagrams.send_blocked = false;
5580 }
5581
5582 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5583
5584 if !path_exclusive_only {
5586 while let Some(remote_addr) = space.pending.new_tokens.pop() {
5587 debug_assert_eq!(space_id, SpaceId::Data);
5588 let ConnectionSide::Server { server_config } = &self.side else {
5589 panic!("NEW_TOKEN frames should not be enqueued by clients");
5590 };
5591
5592 if remote_addr != path.remote {
5593 continue;
5598 }
5599
5600 let token = Token::new(
5601 TokenPayload::Validation {
5602 ip: remote_addr.ip(),
5603 issued: server_config.time_source.now(),
5604 },
5605 &mut self.rng,
5606 );
5607 let new_token = NewToken {
5608 token: token.encode(&*server_config.token_key).into(),
5609 };
5610
5611 if buf.remaining_mut() < new_token.size() {
5612 space.pending.new_tokens.push(remote_addr);
5613 break;
5614 }
5615
5616 trace!("NEW_TOKEN");
5617 new_token.encode(buf);
5618 qlog.frame(&Frame::NewToken(new_token));
5619 sent.retransmits
5620 .get_or_create()
5621 .new_tokens
5622 .push(remote_addr);
5623 self.stats.frame_tx.new_token += 1;
5624 }
5625 }
5626
5627 if !path_exclusive_only && space_id == SpaceId::Data {
5629 sent.stream_frames =
5630 self.streams
5631 .write_stream_frames(buf, self.config.send_fairness, qlog);
5632 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
5633 }
5634
5635 while space_id == SpaceId::Data && frame::AddAddress::SIZE_BOUND <= buf.remaining_mut() {
5638 if let Some(added_address) = space.pending.add_address.pop_last() {
5639 trace!(
5640 seq = %added_address.seq_no,
5641 ip = ?added_address.ip,
5642 port = added_address.port,
5643 "ADD_ADDRESS",
5644 );
5645 added_address.write(buf);
5646 sent.retransmits
5647 .get_or_create()
5648 .add_address
5649 .insert(added_address);
5650 self.stats.frame_tx.add_address = self.stats.frame_tx.add_address.saturating_add(1);
5651 qlog.frame(&Frame::AddAddress(added_address));
5652 } else {
5653 break;
5654 }
5655 }
5656
5657 while space_id == SpaceId::Data && frame::RemoveAddress::SIZE_BOUND <= buf.remaining_mut() {
5659 if let Some(removed_address) = space.pending.remove_address.pop_last() {
5660 trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5661 removed_address.write(buf);
5662 sent.retransmits
5663 .get_or_create()
5664 .remove_address
5665 .insert(removed_address);
5666 self.stats.frame_tx.remove_address =
5667 self.stats.frame_tx.remove_address.saturating_add(1);
5668 qlog.frame(&Frame::RemoveAddress(removed_address));
5669 } else {
5670 break;
5671 }
5672 }
5673
5674 sent
5675 }
5676
5677 fn populate_acks(
5679 now: Instant,
5680 receiving_ecn: bool,
5681 sent: &mut SentFrames,
5682 path_id: PathId,
5683 space_id: SpaceId,
5684 space: &mut PacketSpace,
5685 is_multipath_negotiated: bool,
5686 buf: &mut impl BufMut,
5687 stats: &mut ConnectionStats,
5688 #[allow(unused)] qlog: &mut QlogSentPacket,
5689 ) {
5690 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5692
5693 debug_assert!(
5694 is_multipath_negotiated || path_id == PathId::ZERO,
5695 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5696 );
5697 if is_multipath_negotiated {
5698 debug_assert!(
5699 space_id == SpaceId::Data || path_id == PathId::ZERO,
5700 "path acks must be sent in 1RTT space (have {space_id:?})"
5701 );
5702 }
5703
5704 let pns = space.for_path(path_id);
5705 let ranges = pns.pending_acks.ranges();
5706 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5707 let ecn = if receiving_ecn {
5708 Some(&pns.ecn_counters)
5709 } else {
5710 None
5711 };
5712 if let Some(max) = ranges.max() {
5713 sent.largest_acked.insert(path_id, max);
5714 }
5715
5716 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5717 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5719 let delay = delay_micros >> ack_delay_exp.into_inner();
5720
5721 if is_multipath_negotiated && space_id == SpaceId::Data {
5722 if !ranges.is_empty() {
5723 trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5724 frame::PathAck::encode(path_id, delay as _, ranges, ecn, buf);
5725 qlog.frame_path_ack(path_id, delay as _, ranges, ecn);
5726 stats.frame_tx.path_acks += 1;
5727 }
5728 } else {
5729 trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5730 frame::Ack::encode(delay as _, ranges, ecn, buf);
5731 stats.frame_tx.acks += 1;
5732 qlog.frame_ack(delay, ranges, ecn);
5733 }
5734 }
5735
5736 fn close_common(&mut self) {
5737 trace!("connection closed");
5738 self.timers.reset();
5739 }
5740
5741 fn set_close_timer(&mut self, now: Instant) {
5742 let pto_max = self.pto_max_path(self.highest_space, true);
5745 self.timers.set(
5746 Timer::Conn(ConnTimer::Close),
5747 now + 3 * pto_max,
5748 self.qlog.with_time(now),
5749 );
5750 }
5751
5752 fn handle_peer_params(
5757 &mut self,
5758 params: TransportParameters,
5759 loc_cid: ConnectionId,
5760 rem_cid: ConnectionId,
5761 now: Instant,
5762 ) -> Result<(), TransportError> {
5763 if Some(self.orig_rem_cid) != params.initial_src_cid
5764 || (self.side.is_client()
5765 && (Some(self.initial_dst_cid) != params.original_dst_cid
5766 || self.retry_src_cid != params.retry_src_cid))
5767 {
5768 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5769 "CID authentication failure",
5770 ));
5771 }
5772 if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5773 return Err(TransportError::PROTOCOL_VIOLATION(
5774 "multipath must not use zero-length CIDs",
5775 ));
5776 }
5777
5778 self.set_peer_params(params);
5779 self.qlog.emit_peer_transport_params_received(self, now);
5780
5781 Ok(())
5782 }
5783
5784 fn set_peer_params(&mut self, params: TransportParameters) {
5785 self.streams.set_params(¶ms);
5786 self.idle_timeout =
5787 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5788 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5789
5790 if let Some(ref info) = params.preferred_address {
5791 self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5793 path_id: None,
5794 sequence: 1,
5795 id: info.connection_id,
5796 reset_token: info.stateless_reset_token,
5797 retire_prior_to: 0,
5798 })
5799 .expect(
5800 "preferred address CID is the first received, and hence is guaranteed to be legal",
5801 );
5802 let remote = self.path_data(PathId::ZERO).remote;
5803 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5804 }
5805 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
5806
5807 let mut multipath_enabled = None;
5808 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5809 self.config.get_initial_max_path_id(),
5810 params.initial_max_path_id,
5811 ) {
5812 self.local_max_path_id = local_max_path_id;
5814 self.remote_max_path_id = remote_max_path_id;
5815 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5816 debug!(%initial_max_path_id, "multipath negotiated");
5817 multipath_enabled = Some(initial_max_path_id);
5818 }
5819
5820 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5821 self.config
5822 .max_remote_nat_traversal_addresses
5823 .zip(params.max_remote_nat_traversal_addresses)
5824 {
5825 if let Some(max_initial_paths) =
5826 multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5827 {
5828 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5829 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5830 self.iroh_hp =
5831 iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5832 debug!(
5833 %max_remote_addresses, %max_local_addresses,
5834 "iroh hole punching negotiated"
5835 );
5836
5837 match self.side() {
5838 Side::Client => {
5839 if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5840 warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5843 } else if max_local_addresses as u64
5844 > params.active_connection_id_limit.into_inner()
5845 {
5846 warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5850 }
5851 }
5852 Side::Server => {
5853 if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5854 warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5855 }
5856 }
5857 }
5858 } else {
5859 debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5860 }
5861 }
5862
5863 self.peer_params = params;
5864 let peer_max_udp_payload_size =
5865 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5866 self.path_data_mut(PathId::ZERO)
5867 .mtud
5868 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5869 }
5870
5871 fn decrypt_packet(
5873 &mut self,
5874 now: Instant,
5875 path_id: PathId,
5876 packet: &mut Packet,
5877 ) -> Result<Option<u64>, Option<TransportError>> {
5878 let result = packet_crypto::decrypt_packet_body(
5879 packet,
5880 path_id,
5881 &self.spaces,
5882 self.zero_rtt_crypto.as_ref(),
5883 self.key_phase,
5884 self.prev_crypto.as_ref(),
5885 self.next_crypto.as_ref(),
5886 )?;
5887
5888 let result = match result {
5889 Some(r) => r,
5890 None => return Ok(None),
5891 };
5892
5893 if result.outgoing_key_update_acked {
5894 if let Some(prev) = self.prev_crypto.as_mut() {
5895 prev.end_packet = Some((result.number, now));
5896 self.set_key_discard_timer(now, packet.header.space());
5897 }
5898 }
5899
5900 if result.incoming_key_update {
5901 trace!("key update authenticated");
5902 self.update_keys(Some((result.number, now)), true);
5903 self.set_key_discard_timer(now, packet.header.space());
5904 }
5905
5906 Ok(Some(result.number))
5907 }
5908
5909 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5910 trace!("executing key update");
5911 let new = self
5915 .crypto
5916 .next_1rtt_keys()
5917 .expect("only called for `Data` packets");
5918 self.key_phase_size = new
5919 .local
5920 .confidentiality_limit()
5921 .saturating_sub(KEY_UPDATE_MARGIN);
5922 let old = mem::replace(
5923 &mut self.spaces[SpaceId::Data]
5924 .crypto
5925 .as_mut()
5926 .unwrap() .packet,
5928 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5929 );
5930 self.spaces[SpaceId::Data]
5931 .iter_paths_mut()
5932 .for_each(|s| s.sent_with_keys = 0);
5933 self.prev_crypto = Some(PrevCrypto {
5934 crypto: old,
5935 end_packet,
5936 update_unacked: remote,
5937 });
5938 self.key_phase = !self.key_phase;
5939 }
5940
5941 fn peer_supports_ack_frequency(&self) -> bool {
5942 self.peer_params.min_ack_delay.is_some()
5943 }
5944
5945 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5950 debug_assert_eq!(
5951 self.highest_space,
5952 SpaceId::Data,
5953 "immediate ack must be written in the data space"
5954 );
5955 self.spaces[self.highest_space]
5956 .for_path(path_id)
5957 .immediate_ack_pending = true;
5958 }
5959
5960 #[cfg(test)]
5962 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5963 let (path_id, first_decode, remaining) = match &event.0 {
5964 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5965 path_id,
5966 first_decode,
5967 remaining,
5968 ..
5969 }) => (path_id, first_decode, remaining),
5970 _ => return None,
5971 };
5972
5973 if remaining.is_some() {
5974 panic!("Packets should never be coalesced in tests");
5975 }
5976
5977 let decrypted_header = packet_crypto::unprotect_header(
5978 first_decode.clone(),
5979 &self.spaces,
5980 self.zero_rtt_crypto.as_ref(),
5981 self.peer_params.stateless_reset_token,
5982 )?;
5983
5984 let mut packet = decrypted_header.packet?;
5985 packet_crypto::decrypt_packet_body(
5986 &mut packet,
5987 *path_id,
5988 &self.spaces,
5989 self.zero_rtt_crypto.as_ref(),
5990 self.key_phase,
5991 self.prev_crypto.as_ref(),
5992 self.next_crypto.as_ref(),
5993 )
5994 .ok()?;
5995
5996 Some(packet.payload.to_vec())
5997 }
5998
5999 #[cfg(test)]
6002 pub(crate) fn bytes_in_flight(&self) -> u64 {
6003 self.path_data(PathId::ZERO).in_flight.bytes
6005 }
6006
6007 #[cfg(test)]
6009 pub(crate) fn congestion_window(&self) -> u64 {
6010 let path = self.path_data(PathId::ZERO);
6011 path.congestion
6012 .window()
6013 .saturating_sub(path.in_flight.bytes)
6014 }
6015
6016 #[cfg(test)]
6018 pub(crate) fn is_idle(&self) -> bool {
6019 let current_timers = self.timers.values();
6020 current_timers
6021 .into_iter()
6022 .filter(|(timer, _)| {
6023 !matches!(
6024 timer,
6025 Timer::Conn(ConnTimer::KeepAlive)
6026 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6027 | Timer::Conn(ConnTimer::PushNewCid)
6028 | Timer::Conn(ConnTimer::KeyDiscard)
6029 )
6030 })
6031 .min_by_key(|(_, time)| *time)
6032 .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6033 }
6034
6035 #[cfg(test)]
6037 pub(crate) fn using_ecn(&self) -> bool {
6038 self.path_data(PathId::ZERO).sending_ecn
6039 }
6040
6041 #[cfg(test)]
6043 pub(crate) fn total_recvd(&self) -> u64 {
6044 self.path_data(PathId::ZERO).total_recvd
6045 }
6046
6047 #[cfg(test)]
6048 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6049 self.local_cid_state
6050 .get(&PathId::ZERO)
6051 .unwrap()
6052 .active_seq()
6053 }
6054
6055 #[cfg(test)]
6056 #[track_caller]
6057 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6058 self.local_cid_state
6059 .get(&PathId(path_id))
6060 .unwrap()
6061 .active_seq()
6062 }
6063
6064 #[cfg(test)]
6067 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6068 let n = self
6069 .local_cid_state
6070 .get_mut(&PathId::ZERO)
6071 .unwrap()
6072 .assign_retire_seq(v);
6073 self.endpoint_events
6074 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6075 }
6076
6077 #[cfg(test)]
6079 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
6080 self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
6081 }
6082
6083 #[cfg(test)]
6085 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6086 self.path_data(path_id).current_mtu()
6087 }
6088
6089 #[cfg(test)]
6091 pub(crate) fn trigger_path_validation(&mut self) {
6092 for path in self.paths.values_mut() {
6093 path.data.send_new_challenge = true;
6094 }
6095 }
6096
6097 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6108 let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
6109 path.data.send_new_challenge
6110 || path
6111 .prev
6112 .as_ref()
6113 .is_some_and(|(_, path)| path.send_new_challenge)
6114 || !path.data.path_responses.is_empty()
6115 });
6116 let other = self.streams.can_send_stream_data()
6117 || self
6118 .datagrams
6119 .outgoing
6120 .front()
6121 .is_some_and(|x| x.size(true) <= max_size);
6122 SendableFrames {
6123 acks: false,
6124 other,
6125 close: false,
6126 path_exclusive,
6127 }
6128 }
6129
6130 fn kill(&mut self, reason: ConnectionError) {
6132 self.close_common();
6133 self.state.move_to_drained(Some(reason));
6134 self.endpoint_events.push_back(EndpointEventInner::Drained);
6135 }
6136
6137 pub fn current_mtu(&self) -> u16 {
6144 self.paths
6145 .iter()
6146 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6147 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6148 .min()
6149 .expect("There is always at least one available path")
6150 }
6151
6152 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6159 let pn_len = PacketNumber::new(
6160 pn,
6161 self.spaces[SpaceId::Data]
6162 .for_path(path)
6163 .largest_acked_packet
6164 .unwrap_or(0),
6165 )
6166 .len();
6167
6168 1 + self
6170 .rem_cids
6171 .get(&path)
6172 .map(|cids| cids.active().len())
6173 .unwrap_or(20) + pn_len
6175 + self.tag_len_1rtt()
6176 }
6177
6178 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6179 let pn_len = 4;
6180
6181 let cid_len = self
6182 .rem_cids
6183 .values()
6184 .map(|cids| cids.active().len())
6185 .max()
6186 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
6190 }
6191
6192 fn tag_len_1rtt(&self) -> usize {
6193 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6194 Some(crypto) => Some(&*crypto.packet.local),
6195 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6196 };
6197 key.map_or(16, |x| x.tag_len())
6201 }
6202
6203 fn on_path_validated(&mut self, path_id: PathId) {
6205 self.path_data_mut(path_id).validated = true;
6206 let ConnectionSide::Server { server_config } = &self.side else {
6207 return;
6208 };
6209 let remote_addr = self.path_data(path_id).remote;
6210 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6211 new_tokens.clear();
6212 for _ in 0..server_config.validation_token.sent {
6213 new_tokens.push(remote_addr);
6214 }
6215 }
6216
6217 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6219 if let Some(path) = self.paths.get_mut(&path_id) {
6220 path.data.status.remote_update(status, status_seq_no);
6221 } else {
6222 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6223 }
6224 self.events.push_back(
6225 PathEvent::RemoteStatus {
6226 id: path_id,
6227 status,
6228 }
6229 .into(),
6230 );
6231 }
6232
6233 fn max_path_id(&self) -> Option<PathId> {
6242 if self.is_multipath_negotiated() {
6243 Some(self.remote_max_path_id.min(self.local_max_path_id))
6244 } else {
6245 None
6246 }
6247 }
6248
6249 pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6251 if let Some(added) = self.iroh_hp.add_local_address(address)? {
6252 self.spaces[SpaceId::Data].pending.add_address.insert(added);
6253 };
6254 Ok(())
6255 }
6256
6257 pub fn remove_nat_traversal_address(
6261 &mut self,
6262 address: SocketAddr,
6263 ) -> Result<(), iroh_hp::Error> {
6264 if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6265 self.spaces[SpaceId::Data]
6266 .pending
6267 .remove_address
6268 .insert(removed);
6269 }
6270 Ok(())
6271 }
6272
6273 pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6275 self.iroh_hp.get_local_nat_traversal_addresses()
6276 }
6277
6278 pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6280 Ok(self
6281 .iroh_hp
6282 .client_side()?
6283 .get_remote_nat_traversal_addresses())
6284 }
6285
6286 fn open_nat_traversal_path(
6294 &mut self,
6295 now: Instant,
6296 (ip, port): (IpAddr, u16),
6297 ipv6: bool,
6298 ) -> Result<Option<(PathId, SocketAddr, bool)>, PathError> {
6299 let remote = match ip {
6301 IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6302 IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6303 IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6304 IpAddr::V6(_) => {
6305 trace!("not using IPv6 nat candidate for IPv4 socket");
6306 return Ok(None);
6307 }
6308 };
6309 match self.open_path_ensure(remote, PathStatus::Backup, now) {
6310 Ok((path_id, path_was_known)) => {
6311 if path_was_known {
6312 trace!(%path_id, %remote, "nat traversal: path existed for remote");
6313 }
6314 Ok(Some((path_id, remote, path_was_known)))
6315 }
6316 Err(e) => {
6317 debug!(%remote, %e, "nat traversal: failed to probe remote");
6318 Err(e)
6319 }
6320 }
6321 }
6322
6323 pub fn initiate_nat_traversal_round(
6333 &mut self,
6334 now: Instant,
6335 ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6336 if self.state.is_closed() {
6337 return Err(iroh_hp::Error::Closed);
6338 }
6339
6340 let client_state = self.iroh_hp.client_side_mut()?;
6341 let iroh_hp::NatTraversalRound {
6342 new_round,
6343 reach_out_at,
6344 addresses_to_probe,
6345 prev_round_path_ids,
6346 } = client_state.initiate_nat_traversal_round()?;
6347
6348 self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6349
6350 for path_id in prev_round_path_ids {
6351 let validated = self
6354 .path(path_id)
6355 .map(|path| path.validated)
6356 .unwrap_or(false);
6357
6358 if !validated {
6359 let _ = self.close_path(
6360 now,
6361 path_id,
6362 TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6363 );
6364 }
6365 }
6366
6367 let mut err = None;
6368
6369 let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6370 let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6371 let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6372
6373 for (id, address) in addresses_to_probe {
6374 match self.open_nat_traversal_path(now, address, ipv6) {
6375 Ok(None) => {}
6376 Ok(Some((path_id, remote, path_was_known))) => {
6377 if !path_was_known {
6378 path_ids.push(path_id);
6379 probed_addresses.push(remote);
6380 }
6381 }
6382 Err(e) => {
6383 self.iroh_hp
6384 .client_side_mut()
6385 .expect("validated")
6386 .report_in_continuation(id, e);
6387 err.get_or_insert(e);
6388 }
6389 }
6390 }
6391
6392 if let Some(err) = err {
6393 if probed_addresses.is_empty() {
6395 return Err(iroh_hp::Error::Multipath(err));
6396 }
6397 }
6398
6399 self.iroh_hp
6400 .client_side_mut()
6401 .expect("connection side validated")
6402 .set_round_path_ids(path_ids);
6403
6404 Ok(probed_addresses)
6405 }
6406
6407 fn continue_nat_traversal_round(&mut self, now: Instant) -> Option<bool> {
6412 let client_state = self.iroh_hp.client_side_mut().ok()?;
6413 let (id, address) = client_state.continue_nat_traversal_round()?;
6414 let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6415 let open_result = self.open_nat_traversal_path(now, address, ipv6);
6416 let client_state = self.iroh_hp.client_side_mut().expect("validated");
6417 match open_result {
6418 Ok(None) => Some(true),
6419 Ok(Some((path_id, _remote, path_was_known))) => {
6420 if !path_was_known {
6421 client_state.add_round_path_id(path_id);
6422 }
6423 Some(true)
6424 }
6425 Err(e) => {
6426 client_state.report_in_continuation(id, e);
6427 Some(false)
6428 }
6429 }
6430 }
6431}
6432
6433impl fmt::Debug for Connection {
6434 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6435 f.debug_struct("Connection")
6436 .field("handshake_cid", &self.handshake_cid)
6437 .finish()
6438 }
6439}
6440
6441#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6442enum PathBlocked {
6443 No,
6444 AntiAmplification,
6445 Congestion,
6446 Pacing,
6447}
6448
6449enum ConnectionSide {
6451 Client {
6452 token: Bytes,
6454 token_store: Arc<dyn TokenStore>,
6455 server_name: String,
6456 },
6457 Server {
6458 server_config: Arc<ServerConfig>,
6459 },
6460}
6461
6462impl ConnectionSide {
6463 fn remote_may_migrate(&self, state: &State) -> bool {
6464 match self {
6465 Self::Server { server_config } => server_config.migration,
6466 Self::Client { .. } => {
6467 if let Some(hs) = state.as_handshake() {
6468 hs.allow_server_migration
6469 } else {
6470 false
6471 }
6472 }
6473 }
6474 }
6475
6476 fn is_client(&self) -> bool {
6477 self.side().is_client()
6478 }
6479
6480 fn is_server(&self) -> bool {
6481 self.side().is_server()
6482 }
6483
6484 fn side(&self) -> Side {
6485 match *self {
6486 Self::Client { .. } => Side::Client,
6487 Self::Server { .. } => Side::Server,
6488 }
6489 }
6490}
6491
6492impl From<SideArgs> for ConnectionSide {
6493 fn from(side: SideArgs) -> Self {
6494 match side {
6495 SideArgs::Client {
6496 token_store,
6497 server_name,
6498 } => Self::Client {
6499 token: token_store.take(&server_name).unwrap_or_default(),
6500 token_store,
6501 server_name,
6502 },
6503 SideArgs::Server {
6504 server_config,
6505 pref_addr_cid: _,
6506 path_validated: _,
6507 } => Self::Server { server_config },
6508 }
6509 }
6510}
6511
6512pub(crate) enum SideArgs {
6514 Client {
6515 token_store: Arc<dyn TokenStore>,
6516 server_name: String,
6517 },
6518 Server {
6519 server_config: Arc<ServerConfig>,
6520 pref_addr_cid: Option<ConnectionId>,
6521 path_validated: bool,
6522 },
6523}
6524
6525impl SideArgs {
6526 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6527 match *self {
6528 Self::Client { .. } => None,
6529 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6530 }
6531 }
6532
6533 pub(crate) fn path_validated(&self) -> bool {
6534 match *self {
6535 Self::Client { .. } => true,
6536 Self::Server { path_validated, .. } => path_validated,
6537 }
6538 }
6539
6540 pub(crate) fn side(&self) -> Side {
6541 match *self {
6542 Self::Client { .. } => Side::Client,
6543 Self::Server { .. } => Side::Server,
6544 }
6545 }
6546}
6547
6548#[derive(Debug, Error, Clone, PartialEq, Eq)]
6550pub enum ConnectionError {
6551 #[error("peer doesn't implement any supported version")]
6553 VersionMismatch,
6554 #[error(transparent)]
6556 TransportError(#[from] TransportError),
6557 #[error("aborted by peer: {0}")]
6559 ConnectionClosed(frame::ConnectionClose),
6560 #[error("closed by peer: {0}")]
6562 ApplicationClosed(frame::ApplicationClose),
6563 #[error("reset by peer")]
6565 Reset,
6566 #[error("timed out")]
6572 TimedOut,
6573 #[error("closed")]
6575 LocallyClosed,
6576 #[error("CIDs exhausted")]
6580 CidsExhausted,
6581}
6582
6583impl From<Close> for ConnectionError {
6584 fn from(x: Close) -> Self {
6585 match x {
6586 Close::Connection(reason) => Self::ConnectionClosed(reason),
6587 Close::Application(reason) => Self::ApplicationClosed(reason),
6588 }
6589 }
6590}
6591
6592impl From<ConnectionError> for io::Error {
6594 fn from(x: ConnectionError) -> Self {
6595 use ConnectionError::*;
6596 let kind = match x {
6597 TimedOut => io::ErrorKind::TimedOut,
6598 Reset => io::ErrorKind::ConnectionReset,
6599 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6600 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6601 io::ErrorKind::Other
6602 }
6603 };
6604 Self::new(kind, x)
6605 }
6606}
6607
6608#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6611pub enum PathError {
6612 #[error("multipath extension not negotiated")]
6614 MultipathNotNegotiated,
6615 #[error("the server side may not open a path")]
6617 ServerSideNotAllowed,
6618 #[error("maximum number of concurrent paths reached")]
6620 MaxPathIdReached,
6621 #[error("remoted CIDs exhausted")]
6623 RemoteCidsExhausted,
6624 #[error("path validation failed")]
6626 ValidationFailed,
6627 #[error("invalid remote address")]
6629 InvalidRemoteAddress(SocketAddr),
6630}
6631
6632#[derive(Debug, Error, Clone, Eq, PartialEq)]
6634pub enum ClosePathError {
6635 #[error("closed path")]
6637 ClosedPath,
6638 #[error("last open path")]
6640 LastOpenPath,
6641}
6642
6643#[derive(Debug, Error, Clone, Copy)]
6644#[error("Multipath extension not negotiated")]
6645pub struct MultipathNotNegotiated {
6646 _private: (),
6647}
6648
6649#[derive(Debug)]
6651pub enum Event {
6652 HandshakeDataReady,
6654 Connected,
6656 HandshakeConfirmed,
6658 ConnectionLost {
6662 reason: ConnectionError,
6664 },
6665 Stream(StreamEvent),
6667 DatagramReceived,
6669 DatagramsUnblocked,
6671 Path(PathEvent),
6673 NatTraversal(iroh_hp::Event),
6675}
6676
6677impl From<PathEvent> for Event {
6678 fn from(source: PathEvent) -> Self {
6679 Self::Path(source)
6680 }
6681}
6682
6683fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6684 Duration::from_micros(params.max_ack_delay.0 * 1000)
6685}
6686
6687const MAX_BACKOFF_EXPONENT: u32 = 16;
6689
6690const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6698
6699const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6705 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6706
6707const KEY_UPDATE_MARGIN: u64 = 10_000;
6711
6712#[derive(Default)]
6713struct SentFrames {
6714 retransmits: ThinRetransmits,
6715 largest_acked: FxHashMap<PathId, u64>,
6717 stream_frames: StreamMetaVec,
6718 non_retransmits: bool,
6720 requires_padding: bool,
6722}
6723
6724impl SentFrames {
6725 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6727 !self.largest_acked.is_empty()
6728 && !self.non_retransmits
6729 && self.stream_frames.is_empty()
6730 && self.retransmits.is_empty(streams)
6731 }
6732}
6733
6734fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6742 match (x, y) {
6743 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6744 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6745 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6746 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6747 }
6748}
6749
6750#[cfg(test)]
6751mod tests {
6752 use super::*;
6753
6754 #[test]
6755 fn negotiate_max_idle_timeout_commutative() {
6756 let test_params = [
6757 (None, None, None),
6758 (None, Some(VarInt(0)), None),
6759 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6760 (Some(VarInt(0)), Some(VarInt(0)), None),
6761 (
6762 Some(VarInt(2)),
6763 Some(VarInt(0)),
6764 Some(Duration::from_millis(2)),
6765 ),
6766 (
6767 Some(VarInt(1)),
6768 Some(VarInt(4)),
6769 Some(Duration::from_millis(1)),
6770 ),
6771 ];
6772
6773 for (left, right, result) in test_params {
6774 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6775 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6776 }
6777 }
6778}