1use std::{
2 cmp,
3 collections::{BTreeMap, VecDeque, btree_map},
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 num::NonZeroU32,
8 ops::Not,
9 sync::Arc,
10};
11
12use bytes::{BufMut, Bytes, BytesMut};
13use frame::StreamMetaVec;
14
15use rand::{Rng, SeedableRng, rngs::StdRng};
16use rustc_hash::{FxHashMap, FxHashSet};
17use thiserror::Error;
18use tracing::{debug, error, trace, trace_span, warn};
19
20use crate::{
21 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
22 MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
23 TransportErrorCode, VarInt,
24 cid_generator::ConnectionIdGenerator,
25 cid_queue::CidQueue,
26 coding::BufMutExt,
27 config::{ServerConfig, TransportConfig},
28 congestion::Controller,
29 connection::{
30 qlog::{QlogRecvPacket, QlogSentPacket, QlogSink},
31 spaces::LostPacket,
32 timer::{ConnTimer, PathTimer},
33 },
34 crypto::{self, KeyPair, Keys, PacketKey},
35 frame::{self, Close, Datagram, FrameStruct, NewToken, ObservedAddr},
36 iroh_hp,
37 packet::{
38 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
39 PacketNumber, PartialDecode, SpaceId,
40 },
41 range_set::ArrayRangeSet,
42 shared::{
43 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
44 EndpointEvent, EndpointEventInner,
45 },
46 token::{ResetToken, Token, TokenPayload},
47 transport_parameters::TransportParameters,
48};
49
50mod ack_frequency;
51use ack_frequency::AckFrequencyState;
52
53mod assembler;
54pub use assembler::Chunk;
55
56mod cid_state;
57use cid_state::CidState;
58
59mod datagrams;
60use datagrams::DatagramState;
61pub use datagrams::{Datagrams, SendDatagramError};
62
63mod mtud;
64mod pacing;
65
66mod packet_builder;
67use packet_builder::{PacketBuilder, PadDatagram};
68
69mod packet_crypto;
70use packet_crypto::{PrevCrypto, ZeroRttCrypto};
71
72mod paths;
73pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPathStatusError};
74use paths::{PathData, PathState};
75
76pub(crate) mod qlog;
77pub(crate) mod send_buffer;
78
79mod spaces;
80#[cfg(fuzzing)]
81pub use spaces::Retransmits;
82#[cfg(not(fuzzing))]
83use spaces::Retransmits;
84use spaces::{PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
85
86mod stats;
87pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
88
89mod streams;
90#[cfg(fuzzing)]
91pub use streams::StreamsState;
92#[cfg(not(fuzzing))]
93use streams::StreamsState;
94pub use streams::{
95 Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, SendStream,
96 ShouldTransmit, StreamEvent, Streams, WriteError, Written,
97};
98
99mod timer;
100use timer::{Timer, TimerTable};
101
102mod transmit_buf;
103use transmit_buf::TransmitBuf;
104
105mod state;
106
107#[cfg(not(fuzzing))]
108use state::State;
109#[cfg(fuzzing)]
110pub use state::State;
111use state::StateType;
112
113pub struct Connection {
153 endpoint_config: Arc<EndpointConfig>,
154 config: Arc<TransportConfig>,
155 rng: StdRng,
156 crypto: Box<dyn crypto::Session>,
157 handshake_cid: ConnectionId,
159 rem_handshake_cid: ConnectionId,
161 local_ip: Option<IpAddr>,
164 paths: BTreeMap<PathId, PathState>,
170 path_counter: u64,
174 allow_mtud: bool,
176 state: State,
177 side: ConnectionSide,
178 zero_rtt_enabled: bool,
180 zero_rtt_crypto: Option<ZeroRttCrypto>,
182 key_phase: bool,
183 key_phase_size: u64,
185 peer_params: TransportParameters,
187 orig_rem_cid: ConnectionId,
189 initial_dst_cid: ConnectionId,
191 retry_src_cid: Option<ConnectionId>,
194 events: VecDeque<Event>,
196 endpoint_events: VecDeque<EndpointEventInner>,
197 spin_enabled: bool,
199 spin: bool,
201 spaces: [PacketSpace; 3],
203 highest_space: SpaceId,
205 prev_crypto: Option<PrevCrypto>,
207 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
212 accepted_0rtt: bool,
213 permit_idle_reset: bool,
215 idle_timeout: Option<Duration>,
217 timers: TimerTable,
218 authentication_failures: u64,
220
221 close: bool,
226
227 ack_frequency: AckFrequencyState,
231
232 receiving_ecn: bool,
237 total_authed_packets: u64,
239 app_limited: bool,
242
243 next_observed_addr_seq_no: VarInt,
248
249 streams: StreamsState,
250 rem_cids: FxHashMap<PathId, CidQueue>,
256 local_cid_state: FxHashMap<PathId, CidState>,
263 datagrams: DatagramState,
265 stats: ConnectionStats,
267 path_stats: FxHashMap<PathId, PathStats>,
269 version: u32,
271
272 max_concurrent_paths: NonZeroU32,
281 local_max_path_id: PathId,
296 remote_max_path_id: PathId,
302 max_path_id_with_cids: PathId,
308 abandoned_paths: FxHashSet<PathId>,
316
317 iroh_hp: iroh_hp::State,
318 qlog: QlogSink,
319}
320
321impl Connection {
322 pub(crate) fn new(
323 endpoint_config: Arc<EndpointConfig>,
324 config: Arc<TransportConfig>,
325 init_cid: ConnectionId,
326 loc_cid: ConnectionId,
327 rem_cid: ConnectionId,
328 remote: SocketAddr,
329 local_ip: Option<IpAddr>,
330 crypto: Box<dyn crypto::Session>,
331 cid_gen: &dyn ConnectionIdGenerator,
332 now: Instant,
333 version: u32,
334 allow_mtud: bool,
335 rng_seed: [u8; 32],
336 side_args: SideArgs,
337 qlog: QlogSink,
338 ) -> Self {
339 let pref_addr_cid = side_args.pref_addr_cid();
340 let path_validated = side_args.path_validated();
341 let connection_side = ConnectionSide::from(side_args);
342 let side = connection_side.side();
343 let mut rng = StdRng::from_seed(rng_seed);
344 let initial_space = {
345 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut rng);
346 space.crypto = Some(crypto.initial_keys(init_cid, side));
347 space
348 };
349 let handshake_space = PacketSpace::new(now, SpaceId::Handshake, &mut rng);
350 #[cfg(test)]
351 let data_space = match config.deterministic_packet_numbers {
352 true => PacketSpace::new_deterministic(now, SpaceId::Data),
353 false => PacketSpace::new(now, SpaceId::Data, &mut rng),
354 };
355 #[cfg(not(test))]
356 let data_space = PacketSpace::new(now, SpaceId::Data, &mut rng);
357 let state = State::handshake(state::Handshake {
358 rem_cid_set: side.is_server(),
359 expected_token: Bytes::new(),
360 client_hello: None,
361 allow_server_migration: side.is_client(),
362 });
363 let local_cid_state = FxHashMap::from_iter([(
364 PathId::ZERO,
365 CidState::new(
366 cid_gen.cid_len(),
367 cid_gen.cid_lifetime(),
368 now,
369 if pref_addr_cid.is_some() { 2 } else { 1 },
370 ),
371 )]);
372
373 let mut path = PathData::new(remote, allow_mtud, None, 0, now, &config);
374 path.open = true;
376 let mut this = Self {
377 endpoint_config,
378 crypto,
379 handshake_cid: loc_cid,
380 rem_handshake_cid: rem_cid,
381 local_cid_state,
382 paths: BTreeMap::from_iter([(
383 PathId::ZERO,
384 PathState {
385 data: path,
386 prev: None,
387 },
388 )]),
389 path_counter: 0,
390 allow_mtud,
391 local_ip,
392 state,
393 side: connection_side,
394 zero_rtt_enabled: false,
395 zero_rtt_crypto: None,
396 key_phase: false,
397 key_phase_size: rng.random_range(10..1000),
404 peer_params: TransportParameters::default(),
405 orig_rem_cid: rem_cid,
406 initial_dst_cid: init_cid,
407 retry_src_cid: None,
408 events: VecDeque::new(),
409 endpoint_events: VecDeque::new(),
410 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
411 spin: false,
412 spaces: [initial_space, handshake_space, data_space],
413 highest_space: SpaceId::Initial,
414 prev_crypto: None,
415 next_crypto: None,
416 accepted_0rtt: false,
417 permit_idle_reset: true,
418 idle_timeout: match config.max_idle_timeout {
419 None | Some(VarInt(0)) => None,
420 Some(dur) => Some(Duration::from_millis(dur.0)),
421 },
422 timers: TimerTable::default(),
423 authentication_failures: 0,
424 close: false,
425
426 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
427 &TransportParameters::default(),
428 )),
429
430 app_limited: false,
431 receiving_ecn: false,
432 total_authed_packets: 0,
433
434 next_observed_addr_seq_no: 0u32.into(),
435
436 streams: StreamsState::new(
437 side,
438 config.max_concurrent_uni_streams,
439 config.max_concurrent_bidi_streams,
440 config.send_window,
441 config.receive_window,
442 config.stream_receive_window,
443 ),
444 datagrams: DatagramState::default(),
445 config,
446 rem_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(rem_cid))]),
447 rng,
448 stats: ConnectionStats::default(),
449 path_stats: Default::default(),
450 version,
451
452 max_concurrent_paths: NonZeroU32::MIN,
454 local_max_path_id: PathId::ZERO,
455 remote_max_path_id: PathId::ZERO,
456 max_path_id_with_cids: PathId::ZERO,
457 abandoned_paths: Default::default(),
458
459 iroh_hp: Default::default(),
461 qlog,
462 };
463 if path_validated {
464 this.on_path_validated(PathId::ZERO);
465 }
466 if side.is_client() {
467 this.write_crypto();
469 this.init_0rtt(now);
470 }
471 this.qlog.emit_tuple_assigned(PathId::ZERO, remote, now);
472 this
473 }
474
475 #[must_use]
483 pub fn poll_timeout(&mut self) -> Option<Instant> {
484 self.timers.peek()
485 }
486
487 #[must_use]
493 pub fn poll(&mut self) -> Option<Event> {
494 if let Some(x) = self.events.pop_front() {
495 return Some(x);
496 }
497
498 if let Some(event) = self.streams.poll() {
499 return Some(Event::Stream(event));
500 }
501
502 if let Some(reason) = self.state.take_error() {
503 return Some(Event::ConnectionLost { reason });
504 }
505
506 None
507 }
508
509 #[must_use]
511 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
512 self.endpoint_events.pop_front().map(EndpointEvent)
513 }
514
515 #[must_use]
517 pub fn streams(&mut self) -> Streams<'_> {
518 Streams {
519 state: &mut self.streams,
520 conn_state: &self.state,
521 }
522 }
523
524 #[must_use]
526 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
527 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
528 RecvStream {
529 id,
530 state: &mut self.streams,
531 pending: &mut self.spaces[SpaceId::Data].pending,
532 }
533 }
534
535 #[must_use]
537 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
538 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
539 SendStream {
540 id,
541 state: &mut self.streams,
542 pending: &mut self.spaces[SpaceId::Data].pending,
543 conn_state: &self.state,
544 }
545 }
546
547 pub fn open_path_ensure(
554 &mut self,
555 remote: SocketAddr,
556 initial_status: PathStatus,
557 now: Instant,
558 ) -> Result<(PathId, bool), PathError> {
559 match self
560 .paths
561 .iter()
562 .find(|(_id, path)| path.data.remote == remote)
563 {
564 Some((path_id, _state)) => Ok((*path_id, true)),
565 None => self
566 .open_path(remote, initial_status, now)
567 .map(|id| (id, false)),
568 }
569 }
570
571 pub fn open_path(
576 &mut self,
577 remote: SocketAddr,
578 initial_status: PathStatus,
579 now: Instant,
580 ) -> Result<PathId, PathError> {
581 if !self.is_multipath_negotiated() {
582 return Err(PathError::MultipathNotNegotiated);
583 }
584 if self.side().is_server() {
585 return Err(PathError::ServerSideNotAllowed);
586 }
587
588 let max_abandoned = self.abandoned_paths.iter().max().copied();
589 let max_used = self.paths.keys().last().copied();
590 let path_id = max_abandoned
591 .max(max_used)
592 .unwrap_or(PathId::ZERO)
593 .saturating_add(1u8);
594
595 if Some(path_id) > self.max_path_id() {
596 return Err(PathError::MaxPathIdReached);
597 }
598 if path_id > self.remote_max_path_id {
599 self.spaces[SpaceId::Data].pending.paths_blocked = true;
600 return Err(PathError::MaxPathIdReached);
601 }
602 if self.rem_cids.get(&path_id).map(CidQueue::active).is_none() {
603 self.spaces[SpaceId::Data]
604 .pending
605 .path_cids_blocked
606 .push(path_id);
607 return Err(PathError::RemoteCidsExhausted);
608 }
609
610 let path = self.ensure_path(path_id, remote, now, None);
611 path.status.local_update(initial_status);
612
613 Ok(path_id)
614 }
615
616 pub fn close_path(
622 &mut self,
623 now: Instant,
624 path_id: PathId,
625 error_code: VarInt,
626 ) -> Result<(), ClosePathError> {
627 if self.abandoned_paths.contains(&path_id)
628 || Some(path_id) > self.max_path_id()
629 || !self.paths.contains_key(&path_id)
630 {
631 return Err(ClosePathError::ClosedPath);
632 }
633 if self
634 .paths
635 .iter()
636 .any(|(id, path)| {
638 *id != path_id && !self.abandoned_paths.contains(id) && path.data.validated
639 })
640 .not()
641 {
642 return Err(ClosePathError::LastOpenPath);
643 }
644
645 self.spaces[SpaceId::Data]
647 .pending
648 .path_abandon
649 .insert(path_id, error_code.into());
650
651 let pending_space = &mut self.spaces[SpaceId::Data].pending;
653 pending_space.new_cids.retain(|cid| cid.path_id != path_id);
654 pending_space.path_cids_blocked.retain(|&id| id != path_id);
655 pending_space.path_status.retain(|&id| id != path_id);
656
657 for space in self.spaces[SpaceId::Data].iter_paths_mut() {
659 for sent_packet in space.sent_packets.values_mut() {
660 if let Some(retransmits) = sent_packet.retransmits.get_mut() {
661 retransmits.new_cids.retain(|cid| cid.path_id != path_id);
662 retransmits.path_cids_blocked.retain(|&id| id != path_id);
663 retransmits.path_status.retain(|&id| id != path_id);
664 }
665 }
666 }
667
668 self.rem_cids.remove(&path_id);
674 self.endpoint_events
675 .push_back(EndpointEventInner::RetireResetToken(path_id));
676
677 let pto = self.pto_max_path(SpaceId::Data);
678
679 let path = self.paths.get_mut(&path_id).expect("checked above");
680
681 path.data.last_allowed_receive = Some(now + 3 * pto);
683 self.abandoned_paths.insert(path_id);
684
685 self.set_max_path_id(now, self.local_max_path_id.saturating_add(1u8));
686
687 self.timers.set(
692 Timer::PerPath(path_id, PathTimer::DiscardPath),
693 now + 6 * pto,
694 self.qlog.with_time(now),
695 );
696 Ok(())
697 }
698
699 #[track_caller]
703 fn path_data(&self, path_id: PathId) -> &PathData {
704 if let Some(data) = self.paths.get(&path_id) {
705 &data.data
706 } else {
707 panic!(
708 "unknown path: {path_id}, currently known paths: {:?}",
709 self.paths.keys().collect::<Vec<_>>()
710 );
711 }
712 }
713
714 fn path(&self, path_id: PathId) -> Option<&PathData> {
716 self.paths.get(&path_id).map(|path_state| &path_state.data)
717 }
718
719 fn path_mut(&mut self, path_id: PathId) -> Option<&mut PathData> {
721 self.paths
722 .get_mut(&path_id)
723 .map(|path_state| &mut path_state.data)
724 }
725
726 pub fn paths(&self) -> Vec<PathId> {
730 self.paths.keys().copied().collect()
731 }
732
733 pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
735 self.path(path_id)
736 .map(PathData::local_status)
737 .ok_or(ClosedPath { _private: () })
738 }
739
740 pub fn path_remote_address(&self, path_id: PathId) -> Result<SocketAddr, ClosedPath> {
742 self.path(path_id)
743 .map(|path| path.remote)
744 .ok_or(ClosedPath { _private: () })
745 }
746
747 pub fn set_path_status(
751 &mut self,
752 path_id: PathId,
753 status: PathStatus,
754 ) -> Result<PathStatus, SetPathStatusError> {
755 if !self.is_multipath_negotiated() {
756 return Err(SetPathStatusError::MultipathNotNegotiated);
757 }
758 let path = self
759 .path_mut(path_id)
760 .ok_or(SetPathStatusError::ClosedPath)?;
761 let prev = match path.status.local_update(status) {
762 Some(prev) => {
763 self.spaces[SpaceId::Data]
764 .pending
765 .path_status
766 .insert(path_id);
767 prev
768 }
769 None => path.local_status(),
770 };
771 Ok(prev)
772 }
773
774 pub fn remote_path_status(&self, path_id: PathId) -> Option<PathStatus> {
779 self.path(path_id).and_then(|path| path.remote_status())
780 }
781
782 pub fn set_path_max_idle_timeout(
788 &mut self,
789 path_id: PathId,
790 timeout: Option<Duration>,
791 ) -> Result<Option<Duration>, ClosedPath> {
792 let path = self
793 .paths
794 .get_mut(&path_id)
795 .ok_or(ClosedPath { _private: () })?;
796 Ok(std::mem::replace(&mut path.data.idle_timeout, timeout))
797 }
798
799 pub fn set_path_keep_alive_interval(
805 &mut self,
806 path_id: PathId,
807 interval: Option<Duration>,
808 ) -> Result<Option<Duration>, ClosedPath> {
809 let path = self
810 .paths
811 .get_mut(&path_id)
812 .ok_or(ClosedPath { _private: () })?;
813 Ok(std::mem::replace(&mut path.data.keep_alive, interval))
814 }
815
816 #[track_caller]
820 fn path_data_mut(&mut self, path_id: PathId) -> &mut PathData {
821 &mut self.paths.get_mut(&path_id).expect("known path").data
822 }
823
824 fn is_remote_validated(&self, remote: SocketAddr) -> bool {
826 self.paths
827 .values()
828 .any(|path_state| path_state.data.remote == remote && path_state.data.validated)
829 }
832
833 fn ensure_path(
834 &mut self,
835 path_id: PathId,
836 remote: SocketAddr,
837 now: Instant,
838 pn: Option<u64>,
839 ) -> &mut PathData {
840 let validated = self.is_remote_validated(remote);
841 let vacant_entry = match self.paths.entry(path_id) {
842 btree_map::Entry::Vacant(vacant_entry) => vacant_entry,
843 btree_map::Entry::Occupied(occupied_entry) => {
844 return &mut occupied_entry.into_mut().data;
845 }
846 };
847
848 debug!(%validated, %path_id, ?remote, "path added");
849 let peer_max_udp_payload_size =
850 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
851 self.path_counter = self.path_counter.wrapping_add(1);
852 let mut data = PathData::new(
853 remote,
854 self.allow_mtud,
855 Some(peer_max_udp_payload_size),
856 self.path_counter,
857 now,
858 &self.config,
859 );
860
861 data.validated = validated;
862
863 let pto = self.ack_frequency.max_ack_delay_for_pto() + data.rtt.pto_base();
864 self.timers.set(
865 Timer::PerPath(path_id, PathTimer::PathOpen),
866 now + 3 * pto,
867 self.qlog.with_time(now),
868 );
869
870 data.send_new_challenge = true;
873
874 let path = vacant_entry.insert(PathState { data, prev: None });
875
876 let mut pn_space = spaces::PacketNumberSpace::new(now, SpaceId::Data, &mut self.rng);
877 if let Some(pn) = pn {
878 pn_space.dedup.insert(pn);
879 }
880 self.spaces[SpaceId::Data]
881 .number_spaces
882 .insert(path_id, pn_space);
883 self.qlog.emit_tuple_assigned(path_id, remote, now);
884 &mut path.data
885 }
886
887 #[must_use]
897 pub fn poll_transmit(
898 &mut self,
899 now: Instant,
900 max_datagrams: usize,
901 buf: &mut Vec<u8>,
902 ) -> Option<Transmit> {
903 if let Some(probing) = self
904 .iroh_hp
905 .server_side_mut()
906 .ok()
907 .and_then(iroh_hp::ServerState::next_probe)
908 {
909 let destination = probing.remote();
910 trace!(%destination, "RAND_DATA packet");
911 let token: u64 = self.rng.random();
912 buf.put_u64(token);
913 probing.finish(token);
914 return Some(Transmit {
915 destination,
916 ecn: None,
917 size: 8,
918 segment_size: None,
919 src_ip: None,
920 });
921 }
922
923 assert!(max_datagrams != 0);
924 let max_datagrams = match self.config.enable_segmentation_offload {
925 false => 1,
926 true => max_datagrams,
927 };
928
929 let close = match self.state.as_type() {
948 StateType::Drained => {
949 self.app_limited = true;
950 return None;
951 }
952 StateType::Draining | StateType::Closed => {
953 if !self.close {
956 self.app_limited = true;
957 return None;
958 }
959 true
960 }
961 _ => false,
962 };
963
964 if let Some(config) = &self.config.ack_frequency_config {
966 let rtt = self
967 .paths
968 .values()
969 .map(|p| p.data.rtt.get())
970 .min()
971 .expect("one path exists");
972 self.spaces[SpaceId::Data].pending.ack_frequency = self
973 .ack_frequency
974 .should_send_ack_frequency(rtt, config, &self.peer_params)
975 && self.highest_space == SpaceId::Data
976 && self.peer_supports_ack_frequency();
977 }
978
979 let mut coalesce = true;
981
982 let mut pad_datagram = PadDatagram::No;
985
986 let mut congestion_blocked = false;
990
991 let mut last_packet_number = None;
993
994 let mut path_id = *self.paths.first_key_value().expect("one path must exist").0;
995
996 let have_available_path = self.paths.iter().any(|(id, path)| {
999 path.data.validated
1000 && path.data.local_status() == PathStatus::Available
1001 && self.rem_cids.contains_key(id)
1002 });
1003
1004 let mut transmit = TransmitBuf::new(
1006 buf,
1007 max_datagrams,
1008 self.path_data(path_id).current_mtu().into(),
1009 );
1010 if let Some(challenge) = self.send_prev_path_challenge(now, &mut transmit, path_id) {
1011 return Some(challenge);
1012 }
1013 let mut space_id = match path_id {
1014 PathId::ZERO => SpaceId::Initial,
1015 _ => SpaceId::Data,
1016 };
1017
1018 loop {
1019 let Some(remote_cid) = self.rem_cids.get(&path_id).map(CidQueue::active) else {
1021 let err = PathError::RemoteCidsExhausted;
1022 if !self.abandoned_paths.contains(&path_id) {
1023 debug!(?err, %path_id, "no active CID for path");
1024 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
1025 id: path_id,
1026 error: err,
1027 }));
1028 self.close_path(
1032 now,
1033 path_id,
1034 TransportErrorCode::NO_CID_AVAILABLE_FOR_PATH.into(),
1035 )
1036 .ok();
1037 self.spaces[SpaceId::Data]
1038 .pending
1039 .path_cids_blocked
1040 .push(path_id);
1041 } else {
1042 trace!(%path_id, "remote CIDs retired for abandoned path");
1043 }
1044
1045 match self.paths.keys().find(|&&next| next > path_id) {
1046 Some(next_path_id) => {
1047 path_id = *next_path_id;
1049 space_id = SpaceId::Data;
1050
1051 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1053 if let Some(challenge) =
1054 self.send_prev_path_challenge(now, &mut transmit, path_id)
1055 {
1056 return Some(challenge);
1057 }
1058
1059 continue;
1060 }
1061 None => {
1062 trace!(
1064 ?space_id,
1065 %path_id,
1066 "no CIDs to send on path, no more paths"
1067 );
1068 break;
1069 }
1070 }
1071 };
1072
1073 let max_packet_size = if transmit.datagram_remaining_mut() > 0 {
1076 transmit.datagram_remaining_mut()
1078 } else {
1079 transmit.segment_size()
1081 };
1082 let can_send = self.space_can_send(space_id, path_id, max_packet_size, close);
1083 let path_should_send = {
1084 let path_exclusive_only = space_id == SpaceId::Data
1085 && have_available_path
1086 && self.path_data(path_id).local_status() == PathStatus::Backup;
1087 let path_should_send = if path_exclusive_only {
1088 can_send.path_exclusive
1089 } else {
1090 !can_send.is_empty()
1091 };
1092 let needs_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1093 path_should_send || needs_loss_probe || can_send.close
1094 };
1095
1096 if !path_should_send && space_id < SpaceId::Data {
1097 if self.spaces[space_id].crypto.is_some() {
1098 trace!(?space_id, %path_id, "nothing to send in space");
1099 }
1100 space_id = space_id.next();
1101 continue;
1102 }
1103
1104 let send_blocked = if path_should_send && transmit.datagram_remaining_mut() == 0 {
1105 self.path_congestion_check(space_id, path_id, &transmit, &can_send, now)
1107 } else {
1108 PathBlocked::No
1109 };
1110 if send_blocked != PathBlocked::No {
1111 trace!(?space_id, %path_id, ?send_blocked, "congestion blocked");
1112 congestion_blocked = true;
1113 }
1114 if send_blocked != PathBlocked::No && space_id < SpaceId::Data {
1115 space_id = space_id.next();
1118 continue;
1119 }
1120 if !path_should_send || send_blocked != PathBlocked::No {
1121 if transmit.num_datagrams() > 0 {
1126 break;
1127 }
1128
1129 match self.paths.keys().find(|&&next| next > path_id) {
1130 Some(next_path_id) => {
1131 trace!(
1133 ?space_id,
1134 %path_id,
1135 %next_path_id,
1136 "nothing to send on path"
1137 );
1138 path_id = *next_path_id;
1139 space_id = SpaceId::Data;
1140
1141 transmit.set_segment_size(self.path_data(path_id).current_mtu().into());
1143 if let Some(challenge) =
1144 self.send_prev_path_challenge(now, &mut transmit, path_id)
1145 {
1146 return Some(challenge);
1147 }
1148
1149 continue;
1150 }
1151 None => {
1152 trace!(
1154 ?space_id,
1155 %path_id,
1156 next_path_id=?None::<PathId>,
1157 "nothing to send on path"
1158 );
1159 break;
1160 }
1161 }
1162 }
1163
1164 if transmit.datagram_remaining_mut() == 0 {
1166 if transmit.num_datagrams() >= transmit.max_datagrams() {
1167 break;
1169 }
1170
1171 match self.spaces[space_id].for_path(path_id).loss_probes {
1172 0 => transmit.start_new_datagram(),
1173 _ => {
1174 let request_immediate_ack =
1176 space_id == SpaceId::Data && self.peer_supports_ack_frequency();
1177 self.spaces[space_id].maybe_queue_probe(
1178 path_id,
1179 request_immediate_ack,
1180 &self.streams,
1181 );
1182
1183 self.spaces[space_id].for_path(path_id).loss_probes -= 1;
1184
1185 transmit.start_new_datagram_with_size(std::cmp::min(
1189 usize::from(INITIAL_MTU),
1190 transmit.segment_size(),
1191 ));
1192 }
1193 }
1194 trace!(count = transmit.num_datagrams(), "new datagram started");
1195 coalesce = true;
1196 pad_datagram = PadDatagram::No;
1197 }
1198
1199 if transmit.datagram_start_offset() < transmit.len() {
1202 debug_assert!(transmit.datagram_remaining_mut() >= MIN_PACKET_SPACE);
1203 }
1204
1205 if self.spaces[SpaceId::Initial].crypto.is_some()
1210 && space_id == SpaceId::Handshake
1211 && self.side.is_client()
1212 {
1213 self.discard_space(now, SpaceId::Initial);
1216 }
1217 if let Some(ref mut prev) = self.prev_crypto {
1218 prev.update_unacked = false;
1219 }
1220
1221 let mut qlog = QlogSentPacket::default();
1222 let mut builder = PacketBuilder::new(
1223 now,
1224 space_id,
1225 path_id,
1226 remote_cid,
1227 &mut transmit,
1228 can_send.other,
1229 self,
1230 &mut qlog,
1231 )?;
1232 last_packet_number = Some(builder.exact_number);
1233 coalesce = coalesce && !builder.short_header;
1234
1235 if space_id == SpaceId::Initial && (self.side.is_client() || can_send.other) {
1236 pad_datagram |= PadDatagram::ToMinMtu;
1238 }
1239 if space_id == SpaceId::Data && self.config.pad_to_mtu {
1240 pad_datagram |= PadDatagram::ToSegmentSize;
1241 }
1242
1243 if can_send.close {
1244 trace!("sending CONNECTION_CLOSE");
1245 let mut sent_frames = SentFrames::default();
1250 let is_multipath_negotiated = self.is_multipath_negotiated();
1251 for path_id in self.spaces[space_id]
1252 .number_spaces
1253 .iter()
1254 .filter(|(_, pns)| !pns.pending_acks.ranges().is_empty())
1255 .map(|(&path_id, _)| path_id)
1256 .collect::<Vec<_>>()
1257 {
1258 Self::populate_acks(
1259 now,
1260 self.receiving_ecn,
1261 &mut sent_frames,
1262 path_id,
1263 space_id,
1264 &mut self.spaces[space_id],
1265 is_multipath_negotiated,
1266 &mut builder.frame_space_mut(),
1267 &mut self.stats,
1268 &mut qlog,
1269 );
1270 }
1271
1272 debug_assert!(
1276 builder.frame_space_remaining() > frame::ConnectionClose::SIZE_BOUND,
1277 "ACKs should leave space for ConnectionClose"
1278 );
1279 if frame::ConnectionClose::SIZE_BOUND < builder.frame_space_remaining() {
1280 let max_frame_size = builder.frame_space_remaining();
1281 match self.state.as_type() {
1282 StateType::Closed => {
1283 let reason: Close =
1284 self.state.as_closed().expect("checked").clone().into();
1285 if space_id == SpaceId::Data || reason.is_transport_layer() {
1286 reason.encode(&mut builder.frame_space_mut(), max_frame_size);
1287 qlog.frame(&Frame::Close(reason));
1288 } else {
1289 let frame = frame::ConnectionClose {
1290 error_code: TransportErrorCode::APPLICATION_ERROR,
1291 frame_type: None,
1292 reason: Bytes::new(),
1293 };
1294 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1295 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1296 }
1297 }
1298 StateType::Draining => {
1299 let frame = frame::ConnectionClose {
1300 error_code: TransportErrorCode::NO_ERROR,
1301 frame_type: None,
1302 reason: Bytes::new(),
1303 };
1304 frame.encode(&mut builder.frame_space_mut(), max_frame_size);
1305 qlog.frame(&Frame::Close(frame::Close::Connection(frame)));
1306 }
1307 _ => unreachable!(
1308 "tried to make a close packet when the connection wasn't closed"
1309 ),
1310 };
1311 }
1312 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1313 if space_id == self.highest_space {
1314 self.close = false;
1317 break;
1319 } else {
1320 space_id = space_id.next();
1324 continue;
1325 }
1326 }
1327
1328 if space_id == SpaceId::Data && builder.buf.num_datagrams() == 1 {
1331 let path = self.path_data_mut(path_id);
1332 if let Some((token, remote)) = path.path_responses.pop_off_path(path.remote) {
1333 let response = frame::PathResponse(token);
1337 trace!(%response, "(off-path)");
1338 builder.frame_space_mut().write(response);
1339 qlog.frame(&Frame::PathResponse(response));
1340 self.stats.frame_tx.path_response += 1;
1341 builder.finish_and_track(
1342 now,
1343 self,
1344 path_id,
1345 SentFrames {
1346 non_retransmits: true,
1347 ..SentFrames::default()
1348 },
1349 PadDatagram::ToMinMtu,
1350 qlog,
1351 );
1352 self.stats.udp_tx.on_sent(1, transmit.len());
1353 return Some(Transmit {
1354 destination: remote,
1355 size: transmit.len(),
1356 ecn: None,
1357 segment_size: None,
1358 src_ip: self.local_ip,
1359 });
1360 }
1361 }
1362
1363 let sent_frames = {
1364 let path_exclusive_only = have_available_path
1365 && self.path_data(path_id).local_status() == PathStatus::Backup;
1366 let pn = builder.exact_number;
1367 self.populate_packet(
1368 now,
1369 space_id,
1370 path_id,
1371 path_exclusive_only,
1372 &mut builder.frame_space_mut(),
1373 pn,
1374 &mut qlog,
1375 )
1376 };
1377
1378 debug_assert!(
1385 !(sent_frames.is_ack_only(&self.streams)
1386 && !can_send.acks
1387 && can_send.other
1388 && builder.buf.segment_size()
1389 == self.path_data(path_id).current_mtu() as usize
1390 && self.datagrams.outgoing.is_empty()),
1391 "SendableFrames was {can_send:?}, but only ACKs have been written"
1392 );
1393 if sent_frames.requires_padding {
1394 pad_datagram |= PadDatagram::ToMinMtu;
1395 }
1396
1397 for (path_id, _pn) in sent_frames.largest_acked.iter() {
1398 self.spaces[space_id]
1399 .for_path(*path_id)
1400 .pending_acks
1401 .acks_sent();
1402 self.timers.stop(
1403 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
1404 self.qlog.with_time(now),
1405 );
1406 }
1407
1408 if coalesce
1416 && builder
1417 .buf
1418 .datagram_remaining_mut()
1419 .saturating_sub(builder.predict_packet_end())
1420 > MIN_PACKET_SPACE
1421 && self
1422 .next_send_space(space_id, path_id, builder.buf, close)
1423 .is_some()
1424 {
1425 builder.finish_and_track(now, self, path_id, sent_frames, PadDatagram::No, qlog);
1428 } else {
1429 if builder.buf.num_datagrams() > 1 && matches!(pad_datagram, PadDatagram::No) {
1432 const MAX_PADDING: usize = 32;
1440 if builder.buf.datagram_remaining_mut()
1441 > builder.predict_packet_end() + MAX_PADDING
1442 {
1443 trace!(
1444 "GSO truncated by demand for {} padding bytes",
1445 builder.buf.datagram_remaining_mut() - builder.predict_packet_end()
1446 );
1447 builder.finish_and_track(
1448 now,
1449 self,
1450 path_id,
1451 sent_frames,
1452 PadDatagram::No,
1453 qlog,
1454 );
1455 break;
1456 }
1457
1458 builder.finish_and_track(
1461 now,
1462 self,
1463 path_id,
1464 sent_frames,
1465 PadDatagram::ToSegmentSize,
1466 qlog,
1467 );
1468 } else {
1469 builder.finish_and_track(now, self, path_id, sent_frames, pad_datagram, qlog);
1470 }
1471 if transmit.num_datagrams() == 1 {
1472 transmit.clip_datagram_size();
1473 }
1474 }
1475 }
1476
1477 if let Some(last_packet_number) = last_packet_number {
1478 self.path_data_mut(path_id).congestion.on_sent(
1481 now,
1482 transmit.len() as u64,
1483 last_packet_number,
1484 );
1485 }
1486
1487 self.qlog.emit_recovery_metrics(
1488 path_id,
1489 &mut self.paths.get_mut(&path_id).unwrap().data,
1490 now,
1491 );
1492
1493 self.app_limited = transmit.is_empty() && !congestion_blocked;
1494
1495 if transmit.is_empty() && self.state.is_established() {
1497 let space_id = SpaceId::Data;
1499 path_id = *self.paths.first_key_value().expect("one path must exist").0;
1500 let probe_data = loop {
1501 let active_cid = self.rem_cids.get(&path_id).map(CidQueue::active);
1507 let eligible = self.path_data(path_id).validated
1508 && !self.path_data(path_id).is_validating_path()
1509 && !self.abandoned_paths.contains(&path_id);
1510 let probe_size = eligible
1511 .then(|| {
1512 let next_pn = self.spaces[space_id].for_path(path_id).peek_tx_number();
1513 self.path_data_mut(path_id).mtud.poll_transmit(now, next_pn)
1514 })
1515 .flatten();
1516 match (active_cid, probe_size) {
1517 (Some(active_cid), Some(probe_size)) => {
1518 break Some((active_cid, probe_size));
1520 }
1521 _ => {
1522 match self.paths.keys().find(|&&next| next > path_id) {
1524 Some(next) => {
1525 path_id = *next;
1526 continue;
1527 }
1528 None => break None,
1529 }
1530 }
1531 }
1532 };
1533 if let Some((active_cid, probe_size)) = probe_data {
1534 debug_assert_eq!(transmit.num_datagrams(), 0);
1536 transmit.start_new_datagram_with_size(probe_size as usize);
1537
1538 let mut qlog = QlogSentPacket::default();
1539 let mut builder = PacketBuilder::new(
1540 now,
1541 space_id,
1542 path_id,
1543 active_cid,
1544 &mut transmit,
1545 true,
1546 self,
1547 &mut qlog,
1548 )?;
1549
1550 trace!(?probe_size, "writing MTUD probe");
1552 trace!("PING");
1553 builder.frame_space_mut().write(frame::FrameType::PING);
1554 qlog.frame(&Frame::Ping);
1555 self.stats.frame_tx.ping += 1;
1556
1557 if self.peer_supports_ack_frequency() {
1559 trace!("IMMEDIATE_ACK");
1560 builder
1561 .frame_space_mut()
1562 .write(frame::FrameType::IMMEDIATE_ACK);
1563 self.stats.frame_tx.immediate_ack += 1;
1564 qlog.frame(&Frame::ImmediateAck);
1565 }
1566
1567 let sent_frames = SentFrames {
1568 non_retransmits: true,
1569 ..Default::default()
1570 };
1571 builder.finish_and_track(
1572 now,
1573 self,
1574 path_id,
1575 sent_frames,
1576 PadDatagram::ToSize(probe_size),
1577 qlog,
1578 );
1579
1580 self.path_stats
1581 .entry(path_id)
1582 .or_default()
1583 .sent_plpmtud_probes += 1;
1584 }
1585 }
1586
1587 if transmit.is_empty() {
1588 return None;
1589 }
1590
1591 let destination = self.path_data(path_id).remote;
1592 trace!(
1593 segment_size = transmit.segment_size(),
1594 last_datagram_len = transmit.len() % transmit.segment_size(),
1595 ?destination,
1596 "sending {} bytes in {} datagrams",
1597 transmit.len(),
1598 transmit.num_datagrams()
1599 );
1600 self.path_data_mut(path_id)
1601 .inc_total_sent(transmit.len() as u64);
1602
1603 self.stats
1604 .udp_tx
1605 .on_sent(transmit.num_datagrams() as u64, transmit.len());
1606
1607 Some(Transmit {
1608 destination,
1609 size: transmit.len(),
1610 ecn: if self.path_data(path_id).sending_ecn {
1611 Some(EcnCodepoint::Ect0)
1612 } else {
1613 None
1614 },
1615 segment_size: match transmit.num_datagrams() {
1616 1 => None,
1617 _ => Some(transmit.segment_size()),
1618 },
1619 src_ip: self.local_ip,
1620 })
1621 }
1622
1623 fn next_send_space(
1628 &mut self,
1629 current_space_id: SpaceId,
1630 path_id: PathId,
1631 buf: &TransmitBuf<'_>,
1632 close: bool,
1633 ) -> Option<SpaceId> {
1634 let mut space_id = current_space_id;
1641 loop {
1642 let can_send = self.space_can_send(space_id, path_id, buf.segment_size(), close);
1643 if !can_send.is_empty() || (close && self.spaces[space_id].crypto.is_some()) {
1644 return Some(space_id);
1645 }
1646 space_id = match space_id {
1647 SpaceId::Initial => SpaceId::Handshake,
1648 SpaceId::Handshake => SpaceId::Data,
1649 SpaceId::Data => break,
1650 }
1651 }
1652 None
1653 }
1654
1655 fn path_congestion_check(
1657 &mut self,
1658 space_id: SpaceId,
1659 path_id: PathId,
1660 transmit: &TransmitBuf<'_>,
1661 can_send: &SendableFrames,
1662 now: Instant,
1663 ) -> PathBlocked {
1664 if self.side().is_server()
1670 && self
1671 .path_data(path_id)
1672 .anti_amplification_blocked(transmit.len() as u64 + 1)
1673 {
1674 trace!(?space_id, %path_id, "blocked by anti-amplification");
1675 return PathBlocked::AntiAmplification;
1676 }
1677
1678 let bytes_to_send = transmit.segment_size() as u64;
1681 let need_loss_probe = self.spaces[space_id].for_path(path_id).loss_probes > 0;
1682
1683 if can_send.other && !need_loss_probe && !can_send.close {
1684 let path = self.path_data(path_id);
1685 if path.in_flight.bytes + bytes_to_send >= path.congestion.window() {
1686 trace!(?space_id, %path_id, "blocked by congestion control");
1687 return PathBlocked::Congestion;
1688 }
1689 }
1690
1691 if let Some(delay) = self.path_data_mut(path_id).pacing_delay(bytes_to_send, now) {
1693 self.timers.set(
1694 Timer::PerPath(path_id, PathTimer::Pacing),
1695 delay,
1696 self.qlog.with_time(now),
1697 );
1698 trace!(?space_id, %path_id, "blocked by pacing");
1701 return PathBlocked::Pacing;
1702 }
1703
1704 PathBlocked::No
1705 }
1706
1707 fn send_prev_path_challenge(
1712 &mut self,
1713 now: Instant,
1714 buf: &mut TransmitBuf<'_>,
1715 path_id: PathId,
1716 ) -> Option<Transmit> {
1717 let (prev_cid, prev_path) = self.paths.get_mut(&path_id)?.prev.as_mut()?;
1718 if !prev_path.send_new_challenge {
1721 return None;
1722 };
1723 prev_path.send_new_challenge = false;
1724 let destination = prev_path.remote;
1725 let token = self.rng.random();
1726 let info = paths::SentChallengeInfo {
1727 sent_instant: now,
1728 remote: destination,
1729 };
1730 prev_path.challenges_sent.insert(token, info);
1731 debug_assert_eq!(
1732 self.highest_space,
1733 SpaceId::Data,
1734 "PATH_CHALLENGE queued without 1-RTT keys"
1735 );
1736 buf.start_new_datagram_with_size(MIN_INITIAL_SIZE as usize);
1737
1738 debug_assert_eq!(buf.datagram_start_offset(), 0);
1744 let mut qlog = QlogSentPacket::default();
1745 let mut builder = PacketBuilder::new(
1746 now,
1747 SpaceId::Data,
1748 path_id,
1749 *prev_cid,
1750 buf,
1751 false,
1752 self,
1753 &mut qlog,
1754 )?;
1755 let challenge = frame::PathChallenge(token);
1756 trace!(%challenge, "validating previous path");
1757 qlog.frame(&Frame::PathChallenge(challenge));
1758 builder.frame_space_mut().write(challenge);
1759 self.stats.frame_tx.path_challenge += 1;
1760
1761 builder.pad_to(MIN_INITIAL_SIZE);
1766
1767 builder.finish(self, now, qlog);
1768 self.stats.udp_tx.on_sent(1, buf.len());
1769
1770 Some(Transmit {
1771 destination,
1772 size: buf.len(),
1773 ecn: None,
1774 segment_size: None,
1775 src_ip: self.local_ip,
1776 })
1777 }
1778
1779 fn space_can_send(
1784 &mut self,
1785 space_id: SpaceId,
1786 path_id: PathId,
1787 packet_size: usize,
1788 close: bool,
1789 ) -> SendableFrames {
1790 let pn = self.spaces[SpaceId::Data]
1791 .for_path(path_id)
1792 .peek_tx_number();
1793 let frame_space_1rtt = packet_size.saturating_sub(self.predict_1rtt_overhead(pn, path_id));
1794 if self.spaces[space_id].crypto.is_none()
1795 && (space_id != SpaceId::Data
1796 || self.zero_rtt_crypto.is_none()
1797 || self.side.is_server())
1798 {
1799 return SendableFrames::empty();
1801 }
1802 let mut can_send = self.spaces[space_id].can_send(path_id, &self.streams);
1803 if space_id == SpaceId::Data {
1804 can_send |= self.can_send_1rtt(path_id, frame_space_1rtt);
1805 }
1806
1807 can_send.close = close && self.spaces[space_id].crypto.is_some();
1808
1809 can_send
1810 }
1811
1812 pub fn handle_event(&mut self, event: ConnectionEvent) {
1818 use ConnectionEventInner::*;
1819 match event.0 {
1820 Datagram(DatagramConnectionEvent {
1821 now,
1822 remote,
1823 path_id,
1824 ecn,
1825 first_decode,
1826 remaining,
1827 }) => {
1828 let span = trace_span!("pkt", %path_id);
1829 let _guard = span.enter();
1830 if let Some(known_remote) = self.path(path_id).map(|path| path.remote) {
1834 if remote != known_remote && !self.side.remote_may_migrate(&self.state) {
1835 trace!(
1836 %path_id,
1837 ?remote,
1838 path_remote = ?self.path(path_id).map(|p| p.remote),
1839 "discarding packet from unrecognized peer"
1840 );
1841 return;
1842 }
1843 }
1844
1845 let was_anti_amplification_blocked = self
1846 .path(path_id)
1847 .map(|path| path.anti_amplification_blocked(1))
1848 .unwrap_or(true); self.stats.udp_rx.datagrams += 1;
1852 self.stats.udp_rx.bytes += first_decode.len() as u64;
1853 let data_len = first_decode.len();
1854
1855 self.handle_decode(now, remote, path_id, ecn, first_decode);
1856 if let Some(path) = self.path_mut(path_id) {
1861 path.inc_total_recvd(data_len as u64);
1862 }
1863
1864 if let Some(data) = remaining {
1865 self.stats.udp_rx.bytes += data.len() as u64;
1866 self.handle_coalesced(now, remote, path_id, ecn, data);
1867 }
1868
1869 if let Some(path) = self.paths.get_mut(&path_id) {
1870 self.qlog
1871 .emit_recovery_metrics(path_id, &mut path.data, now);
1872 }
1873
1874 if was_anti_amplification_blocked {
1875 self.set_loss_detection_timer(now, path_id);
1879 }
1880 }
1881 NewIdentifiers(ids, now, cid_len, cid_lifetime) => {
1882 let path_id = ids.first().map(|issued| issued.path_id).unwrap_or_default();
1883 debug_assert!(ids.iter().all(|issued| issued.path_id == path_id));
1884 let cid_state = self
1885 .local_cid_state
1886 .entry(path_id)
1887 .or_insert_with(|| CidState::new(cid_len, cid_lifetime, now, 0));
1888 cid_state.new_cids(&ids, now);
1889
1890 ids.into_iter().rev().for_each(|frame| {
1891 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1892 });
1893 self.reset_cid_retirement(now);
1895 }
1896 }
1897 }
1898
1899 pub fn handle_timeout(&mut self, now: Instant) {
1909 while let Some((timer, _time)) = self.timers.expire_before(now, &self.qlog) {
1910 trace!(?timer, at=?now, "timeout");
1912 match timer {
1913 Timer::Conn(timer) => match timer {
1914 ConnTimer::Close => {
1915 self.state.move_to_drained(None);
1916 self.endpoint_events.push_back(EndpointEventInner::Drained);
1917 }
1918 ConnTimer::Idle => {
1919 self.kill(ConnectionError::TimedOut);
1920 }
1921 ConnTimer::KeepAlive => {
1922 trace!("sending keep-alive");
1923 self.ping();
1924 }
1925 ConnTimer::KeyDiscard => {
1926 self.zero_rtt_crypto = None;
1927 self.prev_crypto = None;
1928 }
1929 ConnTimer::PushNewCid => {
1930 while let Some((path_id, when)) = self.next_cid_retirement() {
1931 if when > now {
1932 break;
1933 }
1934 match self.local_cid_state.get_mut(&path_id) {
1935 None => error!(%path_id, "No local CID state for path"),
1936 Some(cid_state) => {
1937 let num_new_cid = cid_state.on_cid_timeout().into();
1939 if !self.state.is_closed() {
1940 trace!(
1941 "push a new CID to peer RETIRE_PRIOR_TO field {}",
1942 cid_state.retire_prior_to()
1943 );
1944 self.endpoint_events.push_back(
1945 EndpointEventInner::NeedIdentifiers(
1946 path_id,
1947 now,
1948 num_new_cid,
1949 ),
1950 );
1951 }
1952 }
1953 }
1954 }
1955 }
1956 },
1957 Timer::PerPath(path_id, timer) => {
1959 let span = trace_span!("per-path timer fired", %path_id, ?timer);
1960 let _guard = span.enter();
1961 match timer {
1962 PathTimer::PathIdle => {
1963 self.close_path(now, path_id, TransportErrorCode::NO_ERROR.into())
1964 .ok();
1965 }
1966
1967 PathTimer::PathKeepAlive => {
1968 trace!("sending keep-alive on path");
1969 self.ping_path(path_id).ok();
1970 }
1971 PathTimer::LossDetection => {
1972 self.on_loss_detection_timeout(now, path_id);
1973 self.qlog.emit_recovery_metrics(
1974 path_id,
1975 &mut self.paths.get_mut(&path_id).unwrap().data,
1976 now,
1977 );
1978 }
1979 PathTimer::PathValidation => {
1980 let Some(path) = self.paths.get_mut(&path_id) else {
1981 continue;
1982 };
1983 self.timers.stop(
1984 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
1985 self.qlog.with_time(now),
1986 );
1987 debug!("path validation failed");
1988 if let Some((_, prev)) = path.prev.take() {
1989 path.data = prev;
1990 }
1991 path.data.challenges_sent.clear();
1992 path.data.send_new_challenge = false;
1993 }
1994 PathTimer::PathChallengeLost => {
1995 let Some(path) = self.paths.get_mut(&path_id) else {
1996 continue;
1997 };
1998 trace!("path challenge deemed lost");
1999 path.data.send_new_challenge = true;
2000 }
2001 PathTimer::PathOpen => {
2002 let Some(path) = self.path_mut(path_id) else {
2003 continue;
2004 };
2005 path.challenges_sent.clear();
2006 path.send_new_challenge = false;
2007 debug!("new path validation failed");
2008 if let Err(err) = self.close_path(
2009 now,
2010 path_id,
2011 TransportErrorCode::PATH_UNSTABLE_OR_POOR.into(),
2012 ) {
2013 warn!(?err, "failed closing path");
2014 }
2015
2016 self.events.push_back(Event::Path(PathEvent::LocallyClosed {
2017 id: path_id,
2018 error: PathError::ValidationFailed,
2019 }));
2020 }
2021 PathTimer::Pacing => trace!("pacing timer expired"),
2022 PathTimer::MaxAckDelay => {
2023 trace!("max ack delay reached");
2024 self.spaces[SpaceId::Data]
2026 .for_path(path_id)
2027 .pending_acks
2028 .on_max_ack_delay_timeout()
2029 }
2030 PathTimer::DiscardPath => {
2031 self.timers.stop_per_path(path_id, self.qlog.with_time(now));
2034 if let Some(loc_cid_state) = self.local_cid_state.remove(&path_id) {
2035 let (min_seq, max_seq) = loc_cid_state.active_seq();
2036 for seq in min_seq..=max_seq {
2037 self.endpoint_events.push_back(
2038 EndpointEventInner::RetireConnectionId(
2039 now, path_id, seq, false,
2040 ),
2041 );
2042 }
2043 }
2044 self.discard_path(path_id, now);
2045 }
2046 }
2047 }
2048 }
2049 }
2050 }
2051
2052 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
2064 self.close_inner(
2065 now,
2066 Close::Application(frame::ApplicationClose { error_code, reason }),
2067 )
2068 }
2069
2070 fn close_inner(&mut self, now: Instant, reason: Close) {
2071 let was_closed = self.state.is_closed();
2072 if !was_closed {
2073 self.close_common();
2074 self.set_close_timer(now);
2075 self.close = true;
2076 self.state.move_to_closed_local(reason);
2077 }
2078 }
2079
2080 pub fn datagrams(&mut self) -> Datagrams<'_> {
2082 Datagrams { conn: self }
2083 }
2084
2085 pub fn stats(&mut self) -> ConnectionStats {
2087 self.stats.clone()
2088 }
2089
2090 pub fn path_stats(&mut self, path_id: PathId) -> Option<PathStats> {
2092 let path = self.paths.get(&path_id)?;
2093 let stats = self.path_stats.entry(path_id).or_default();
2094 stats.rtt = path.data.rtt.get();
2095 stats.cwnd = path.data.congestion.window();
2096 stats.current_mtu = path.data.mtud.current_mtu();
2097 Some(*stats)
2098 }
2099
2100 pub fn ping(&mut self) {
2104 for path_data in self.spaces[self.highest_space].number_spaces.values_mut() {
2107 path_data.ping_pending = true;
2108 }
2109 }
2110
2111 pub fn ping_path(&mut self, path: PathId) -> Result<(), ClosedPath> {
2115 let path_data = self.spaces[self.highest_space]
2116 .number_spaces
2117 .get_mut(&path)
2118 .ok_or(ClosedPath { _private: () })?;
2119 path_data.ping_pending = true;
2120 Ok(())
2121 }
2122
2123 pub fn force_key_update(&mut self) {
2127 if !self.state.is_established() {
2128 debug!("ignoring forced key update in illegal state");
2129 return;
2130 }
2131 if self.prev_crypto.is_some() {
2132 debug!("ignoring redundant forced key update");
2135 return;
2136 }
2137 self.update_keys(None, false);
2138 }
2139
2140 #[doc(hidden)]
2142 #[deprecated]
2143 pub fn initiate_key_update(&mut self) {
2144 self.force_key_update();
2145 }
2146
2147 pub fn crypto_session(&self) -> &dyn crypto::Session {
2149 &*self.crypto
2150 }
2151
2152 pub fn is_handshaking(&self) -> bool {
2157 self.state.is_handshake()
2158 }
2159
2160 pub fn is_closed(&self) -> bool {
2168 self.state.is_closed()
2169 }
2170
2171 pub fn is_drained(&self) -> bool {
2176 self.state.is_drained()
2177 }
2178
2179 pub fn accepted_0rtt(&self) -> bool {
2183 self.accepted_0rtt
2184 }
2185
2186 pub fn has_0rtt(&self) -> bool {
2188 self.zero_rtt_enabled
2189 }
2190
2191 pub fn has_pending_retransmits(&self) -> bool {
2193 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
2194 }
2195
2196 pub fn side(&self) -> Side {
2198 self.side.side()
2199 }
2200
2201 pub fn path_observed_address(&self, path_id: PathId) -> Result<Option<SocketAddr>, ClosedPath> {
2203 self.path(path_id)
2204 .map(|path_data| {
2205 path_data
2206 .last_observed_addr_report
2207 .as_ref()
2208 .map(|observed| observed.socket_addr())
2209 })
2210 .ok_or(ClosedPath { _private: () })
2211 }
2212
2213 pub fn local_ip(&self) -> Option<IpAddr> {
2223 self.local_ip
2224 }
2225
2226 pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
2228 self.path(path_id).map(|d| d.rtt.get())
2229 }
2230
2231 pub fn congestion_state(&self, path_id: PathId) -> Option<&dyn Controller> {
2233 self.path(path_id).map(|d| d.congestion.as_ref())
2234 }
2235
2236 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
2241 self.streams.set_max_concurrent(dir, count);
2242 let pending = &mut self.spaces[SpaceId::Data].pending;
2245 self.streams.queue_max_stream_id(pending);
2246 }
2247
2248 pub fn set_max_concurrent_paths(
2258 &mut self,
2259 now: Instant,
2260 count: NonZeroU32,
2261 ) -> Result<(), MultipathNotNegotiated> {
2262 if !self.is_multipath_negotiated() {
2263 return Err(MultipathNotNegotiated { _private: () });
2264 }
2265 self.max_concurrent_paths = count;
2266
2267 let in_use_count = self
2268 .local_max_path_id
2269 .next()
2270 .saturating_sub(self.abandoned_paths.len() as u32)
2271 .as_u32();
2272 let extra_needed = count.get().saturating_sub(in_use_count);
2273 let new_max_path_id = self.local_max_path_id.saturating_add(extra_needed);
2274
2275 self.set_max_path_id(now, new_max_path_id);
2276
2277 Ok(())
2278 }
2279
2280 fn set_max_path_id(&mut self, now: Instant, max_path_id: PathId) {
2282 if max_path_id <= self.local_max_path_id {
2283 return;
2284 }
2285
2286 self.local_max_path_id = max_path_id;
2287 self.spaces[SpaceId::Data].pending.max_path_id = true;
2288
2289 self.issue_first_path_cids(now);
2290 }
2291
2292 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
2298 self.streams.max_concurrent(dir)
2299 }
2300
2301 pub fn set_send_window(&mut self, send_window: u64) {
2303 self.streams.set_send_window(send_window);
2304 }
2305
2306 pub fn set_receive_window(&mut self, receive_window: VarInt) {
2308 if self.streams.set_receive_window(receive_window) {
2309 self.spaces[SpaceId::Data].pending.max_data = true;
2310 }
2311 }
2312
2313 pub fn is_multipath_negotiated(&self) -> bool {
2318 !self.is_handshaking()
2319 && self.config.max_concurrent_multipath_paths.is_some()
2320 && self.peer_params.initial_max_path_id.is_some()
2321 }
2322
2323 fn on_ack_received(
2324 &mut self,
2325 now: Instant,
2326 space: SpaceId,
2327 ack: frame::Ack,
2328 ) -> Result<(), TransportError> {
2329 let path = PathId::ZERO;
2331 self.inner_on_ack_received(now, space, path, ack)
2332 }
2333
2334 fn on_path_ack_received(
2335 &mut self,
2336 now: Instant,
2337 space: SpaceId,
2338 path_ack: frame::PathAck,
2339 ) -> Result<(), TransportError> {
2340 let (ack, path) = path_ack.into_ack();
2341 self.inner_on_ack_received(now, space, path, ack)
2342 }
2343
2344 fn inner_on_ack_received(
2346 &mut self,
2347 now: Instant,
2348 space: SpaceId,
2349 path: PathId,
2350 ack: frame::Ack,
2351 ) -> Result<(), TransportError> {
2352 if self.abandoned_paths.contains(&path) {
2353 trace!("silently ignoring PATH_ACK on abandoned path");
2356 return Ok(());
2357 }
2358 if ack.largest >= self.spaces[space].for_path(path).next_packet_number {
2359 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
2360 }
2361 let new_largest = {
2362 let space = &mut self.spaces[space].for_path(path);
2363 if space.largest_acked_packet.is_none_or(|pn| ack.largest > pn) {
2364 space.largest_acked_packet = Some(ack.largest);
2365 if let Some(info) = space.sent_packets.get(ack.largest) {
2366 space.largest_acked_packet_sent = info.time_sent;
2370 }
2371 true
2372 } else {
2373 false
2374 }
2375 };
2376
2377 if self.detect_spurious_loss(&ack, space, path) {
2378 self.path_data_mut(path)
2379 .congestion
2380 .on_spurious_congestion_event();
2381 }
2382
2383 let mut newly_acked = ArrayRangeSet::new();
2385 for range in ack.iter() {
2386 self.spaces[space].for_path(path).check_ack(range.clone())?;
2387 for (pn, _) in self.spaces[space]
2388 .for_path(path)
2389 .sent_packets
2390 .iter_range(range)
2391 {
2392 newly_acked.insert_one(pn);
2393 }
2394 }
2395
2396 if newly_acked.is_empty() {
2397 return Ok(());
2398 }
2399
2400 let mut ack_eliciting_acked = false;
2401 for packet in newly_acked.elts() {
2402 if let Some(info) = self.spaces[space].for_path(path).take(packet) {
2403 for (acked_path_id, acked_pn) in info.largest_acked.iter() {
2404 if let Some(pns) = self.spaces[space].path_space_mut(*acked_path_id) {
2410 pns.pending_acks.subtract_below(*acked_pn);
2411 }
2412 }
2413 ack_eliciting_acked |= info.ack_eliciting;
2414
2415 let path_data = self.path_data_mut(path);
2417 let mtu_updated = path_data.mtud.on_acked(space, packet, info.size);
2418 if mtu_updated {
2419 path_data
2420 .congestion
2421 .on_mtu_update(path_data.mtud.current_mtu());
2422 }
2423
2424 self.ack_frequency.on_acked(path, packet);
2426
2427 self.on_packet_acked(now, path, info);
2428 }
2429 }
2430
2431 let largest_ackd = self.spaces[space].for_path(path).largest_acked_packet;
2432 let app_limited = self.app_limited;
2433 let path_data = self.path_data_mut(path);
2434 let in_flight = path_data.in_flight.bytes;
2435
2436 path_data
2437 .congestion
2438 .on_end_acks(now, in_flight, app_limited, largest_ackd);
2439
2440 if new_largest && ack_eliciting_acked {
2441 let ack_delay = if space != SpaceId::Data {
2442 Duration::from_micros(0)
2443 } else {
2444 cmp::min(
2445 self.ack_frequency.peer_max_ack_delay,
2446 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
2447 )
2448 };
2449 let rtt = now.saturating_duration_since(
2450 self.spaces[space].for_path(path).largest_acked_packet_sent,
2451 );
2452
2453 let next_pn = self.spaces[space].for_path(path).next_packet_number;
2454 let path_data = self.path_data_mut(path);
2455 path_data.rtt.update(ack_delay, rtt);
2457 if path_data.first_packet_after_rtt_sample.is_none() {
2458 path_data.first_packet_after_rtt_sample = Some((space, next_pn));
2459 }
2460 }
2461
2462 self.detect_lost_packets(now, space, path, true);
2464
2465 if self.peer_completed_address_validation(path) {
2466 self.path_data_mut(path).pto_count = 0;
2467 }
2468
2469 if self.path_data(path).sending_ecn {
2474 if let Some(ecn) = ack.ecn {
2475 if new_largest {
2480 let sent = self.spaces[space].for_path(path).largest_acked_packet_sent;
2481 self.process_ecn(now, space, path, newly_acked.len() as u64, ecn, sent);
2482 }
2483 } else {
2484 debug!("ECN not acknowledged by peer");
2486 self.path_data_mut(path).sending_ecn = false;
2487 }
2488 }
2489
2490 self.set_loss_detection_timer(now, path);
2491 Ok(())
2492 }
2493
2494 fn detect_spurious_loss(&mut self, ack: &frame::Ack, space: SpaceId, path: PathId) -> bool {
2495 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2496
2497 if lost_packets.is_empty() {
2498 return false;
2499 }
2500
2501 for range in ack.iter() {
2502 let spurious_losses: Vec<u64> = lost_packets
2503 .iter_range(range.clone())
2504 .map(|(pn, _info)| pn)
2505 .collect();
2506
2507 for pn in spurious_losses {
2508 lost_packets.remove(pn);
2509 }
2510 }
2511
2512 lost_packets.is_empty()
2517 }
2518
2519 fn drain_lost_packets(&mut self, now: Instant, space: SpaceId, path: PathId) {
2524 let two_pto = 2 * self.path_data(path).rtt.pto_base();
2525
2526 let lost_packets = &mut self.spaces[space].for_path(path).lost_packets;
2527 lost_packets.retain(|_pn, info| now.saturating_duration_since(info.time_sent) <= two_pto);
2528 }
2529
2530 fn process_ecn(
2532 &mut self,
2533 now: Instant,
2534 space: SpaceId,
2535 path: PathId,
2536 newly_acked: u64,
2537 ecn: frame::EcnCounts,
2538 largest_sent_time: Instant,
2539 ) {
2540 match self.spaces[space]
2541 .for_path(path)
2542 .detect_ecn(newly_acked, ecn)
2543 {
2544 Err(e) => {
2545 debug!("halting ECN due to verification failure: {}", e);
2546
2547 self.path_data_mut(path).sending_ecn = false;
2548 self.spaces[space].for_path(path).ecn_feedback = frame::EcnCounts::ZERO;
2551 }
2552 Ok(false) => {}
2553 Ok(true) => {
2554 self.path_stats.entry(path).or_default().congestion_events += 1;
2555 self.path_data_mut(path).congestion.on_congestion_event(
2556 now,
2557 largest_sent_time,
2558 false,
2559 true,
2560 0,
2561 );
2562 }
2563 }
2564 }
2565
2566 fn on_packet_acked(&mut self, now: Instant, path_id: PathId, info: SentPacket) {
2569 self.paths
2570 .get_mut(&path_id)
2571 .expect("known path")
2572 .remove_in_flight(&info);
2573 let app_limited = self.app_limited;
2574 let path = self.path_data_mut(path_id);
2575 if info.ack_eliciting && !path.is_validating_path() {
2576 let rtt = path.rtt;
2579 path.congestion
2580 .on_ack(now, info.time_sent, info.size.into(), app_limited, &rtt);
2581 }
2582
2583 if let Some(retransmits) = info.retransmits.get() {
2585 for (id, _) in retransmits.reset_stream.iter() {
2586 self.streams.reset_acked(*id);
2587 }
2588 }
2589
2590 for frame in info.stream_frames {
2591 self.streams.received_ack_of(frame);
2592 }
2593 }
2594
2595 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
2596 let start = if self.zero_rtt_crypto.is_some() {
2597 now
2598 } else {
2599 self.prev_crypto
2600 .as_ref()
2601 .expect("no previous keys")
2602 .end_packet
2603 .as_ref()
2604 .expect("update not acknowledged yet")
2605 .1
2606 };
2607
2608 self.timers.set(
2610 Timer::Conn(ConnTimer::KeyDiscard),
2611 start + self.pto_max_path(space) * 3,
2612 self.qlog.with_time(now),
2613 );
2614 }
2615
2616 fn on_loss_detection_timeout(&mut self, now: Instant, path_id: PathId) {
2629 if let Some((_, pn_space)) = self.loss_time_and_space(path_id) {
2630 self.detect_lost_packets(now, pn_space, path_id, false);
2632 self.set_loss_detection_timer(now, path_id);
2633 return;
2634 }
2635
2636 let (_, space) = match self.pto_time_and_space(now, path_id) {
2637 Some(x) => x,
2638 None => {
2639 error!(%path_id, "PTO expired while unset");
2640 return;
2641 }
2642 };
2643 trace!(
2644 in_flight = self.path_data(path_id).in_flight.bytes,
2645 count = self.path_data(path_id).pto_count,
2646 ?space,
2647 %path_id,
2648 "PTO fired"
2649 );
2650
2651 let count = match self.path_data(path_id).in_flight.ack_eliciting {
2652 0 => {
2655 debug_assert!(!self.peer_completed_address_validation(path_id));
2656 1
2657 }
2658 _ => 2,
2660 };
2661 let pns = self.spaces[space].for_path(path_id);
2662 pns.loss_probes = pns.loss_probes.saturating_add(count);
2663 let path_data = self.path_data_mut(path_id);
2664 path_data.pto_count = path_data.pto_count.saturating_add(1);
2665 self.set_loss_detection_timer(now, path_id);
2666 }
2667
2668 fn detect_lost_packets(
2685 &mut self,
2686 now: Instant,
2687 pn_space: SpaceId,
2688 path_id: PathId,
2689 due_to_ack: bool,
2690 ) {
2691 let mut lost_packets = Vec::<u64>::new();
2692 let mut lost_mtu_probe = None;
2693 let mut in_persistent_congestion = false;
2694 let mut size_of_lost_packets = 0u64;
2695 self.spaces[pn_space].for_path(path_id).loss_time = None;
2696
2697 let path = self.path_data(path_id);
2700 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2701 let loss_delay = path
2702 .rtt
2703 .conservative()
2704 .mul_f32(self.config.time_threshold)
2705 .max(TIMER_GRANULARITY);
2706 let first_packet_after_rtt_sample = path.first_packet_after_rtt_sample;
2707
2708 let largest_acked_packet = self.spaces[pn_space]
2709 .for_path(path_id)
2710 .largest_acked_packet
2711 .expect("detect_lost_packets only to be called if path received at least one ACK");
2712 let packet_threshold = self.config.packet_threshold as u64;
2713
2714 let congestion_period = self
2718 .pto(SpaceId::Data, path_id)
2719 .saturating_mul(self.config.persistent_congestion_threshold);
2720 let mut persistent_congestion_start: Option<Instant> = None;
2721 let mut prev_packet = None;
2722 let space = self.spaces[pn_space].for_path(path_id);
2723
2724 for (packet, info) in space.sent_packets.iter_range(0..largest_acked_packet) {
2725 if prev_packet != Some(packet.wrapping_sub(1)) {
2726 persistent_congestion_start = None;
2728 }
2729
2730 let packet_too_old = now.saturating_duration_since(info.time_sent) >= loss_delay;
2734 if packet_too_old || largest_acked_packet >= packet + packet_threshold {
2735 if Some(packet) == in_flight_mtu_probe {
2737 lost_mtu_probe = in_flight_mtu_probe;
2740 } else {
2741 lost_packets.push(packet);
2742 size_of_lost_packets += info.size as u64;
2743 if info.ack_eliciting && due_to_ack {
2744 match persistent_congestion_start {
2745 Some(start) if info.time_sent - start > congestion_period => {
2748 in_persistent_congestion = true;
2749 }
2750 None if first_packet_after_rtt_sample
2752 .is_some_and(|x| x < (pn_space, packet)) =>
2753 {
2754 persistent_congestion_start = Some(info.time_sent);
2755 }
2756 _ => {}
2757 }
2758 }
2759 }
2760 } else {
2761 if space.loss_time.is_none() {
2763 space.loss_time = Some(info.time_sent + loss_delay);
2766 }
2767 persistent_congestion_start = None;
2768 }
2769
2770 prev_packet = Some(packet);
2771 }
2772
2773 self.handle_lost_packets(
2774 pn_space,
2775 path_id,
2776 now,
2777 lost_packets,
2778 lost_mtu_probe,
2779 loss_delay,
2780 in_persistent_congestion,
2781 size_of_lost_packets,
2782 );
2783 }
2784
2785 fn discard_path(&mut self, path_id: PathId, now: Instant) {
2787 trace!(%path_id, "dropping path state");
2788 let path = self.path_data(path_id);
2789 let in_flight_mtu_probe = path.mtud.in_flight_mtu_probe();
2790
2791 let mut size_of_lost_packets = 0u64; let lost_pns: Vec<_> = self.spaces[SpaceId::Data]
2793 .for_path(path_id)
2794 .sent_packets
2795 .iter()
2796 .filter(|(pn, _info)| Some(*pn) != in_flight_mtu_probe)
2797 .map(|(pn, info)| {
2798 size_of_lost_packets += info.size as u64;
2799 pn
2800 })
2801 .collect();
2802
2803 if !lost_pns.is_empty() {
2804 trace!(
2805 %path_id,
2806 count = lost_pns.len(),
2807 lost_bytes = size_of_lost_packets,
2808 "packets lost on path abandon"
2809 );
2810 self.handle_lost_packets(
2811 SpaceId::Data,
2812 path_id,
2813 now,
2814 lost_pns,
2815 in_flight_mtu_probe,
2816 Duration::ZERO,
2817 false,
2818 size_of_lost_packets,
2819 );
2820 }
2821 self.paths.remove(&path_id);
2822 self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
2823
2824 let path_stats = self.path_stats.remove(&path_id).unwrap_or_default();
2825 self.events.push_back(
2826 PathEvent::Abandoned {
2827 id: path_id,
2828 path_stats,
2829 }
2830 .into(),
2831 );
2832 }
2833
2834 fn handle_lost_packets(
2835 &mut self,
2836 pn_space: SpaceId,
2837 path_id: PathId,
2838 now: Instant,
2839 lost_packets: Vec<u64>,
2840 lost_mtu_probe: Option<u64>,
2841 loss_delay: Duration,
2842 in_persistent_congestion: bool,
2843 size_of_lost_packets: u64,
2844 ) {
2845 debug_assert!(
2846 {
2847 let mut sorted = lost_packets.clone();
2848 sorted.sort();
2849 sorted == lost_packets
2850 },
2851 "lost_packets must be sorted"
2852 );
2853
2854 self.drain_lost_packets(now, pn_space, path_id);
2855
2856 if let Some(largest_lost) = lost_packets.last().cloned() {
2858 let old_bytes_in_flight = self.path_data_mut(path_id).in_flight.bytes;
2859 let largest_lost_sent = self.spaces[pn_space]
2860 .for_path(path_id)
2861 .sent_packets
2862 .get(largest_lost)
2863 .unwrap()
2864 .time_sent;
2865 let path_stats = self.path_stats.entry(path_id).or_default();
2866 path_stats.lost_packets += lost_packets.len() as u64;
2867 path_stats.lost_bytes += size_of_lost_packets;
2868 trace!(
2869 %path_id,
2870 count = lost_packets.len(),
2871 lost_bytes = size_of_lost_packets,
2872 "packets lost",
2873 );
2874
2875 for &packet in &lost_packets {
2876 let Some(info) = self.spaces[pn_space].for_path(path_id).take(packet) else {
2877 continue;
2878 };
2879 self.qlog
2880 .emit_packet_lost(packet, &info, loss_delay, pn_space, now);
2881 self.paths
2882 .get_mut(&path_id)
2883 .unwrap()
2884 .remove_in_flight(&info);
2885
2886 for frame in info.stream_frames {
2887 self.streams.retransmit(frame);
2888 }
2889 self.spaces[pn_space].pending |= info.retransmits;
2890 self.path_data_mut(path_id)
2891 .mtud
2892 .on_non_probe_lost(packet, info.size);
2893
2894 self.spaces[pn_space].for_path(path_id).lost_packets.insert(
2895 packet,
2896 LostPacket {
2897 time_sent: info.time_sent,
2898 },
2899 );
2900 }
2901
2902 let path = self.path_data_mut(path_id);
2903 if path.mtud.black_hole_detected(now) {
2904 path.congestion.on_mtu_update(path.mtud.current_mtu());
2905 if let Some(max_datagram_size) = self.datagrams().max_size() {
2906 self.datagrams.drop_oversized(max_datagram_size);
2907 }
2908 self.path_stats
2909 .entry(path_id)
2910 .or_default()
2911 .black_holes_detected += 1;
2912 }
2913
2914 let lost_ack_eliciting =
2916 old_bytes_in_flight != self.path_data_mut(path_id).in_flight.bytes;
2917
2918 if lost_ack_eliciting {
2919 self.path_stats
2920 .entry(path_id)
2921 .or_default()
2922 .congestion_events += 1;
2923 self.path_data_mut(path_id).congestion.on_congestion_event(
2924 now,
2925 largest_lost_sent,
2926 in_persistent_congestion,
2927 false,
2928 size_of_lost_packets,
2929 );
2930 }
2931 }
2932
2933 if let Some(packet) = lost_mtu_probe {
2935 let info = self.spaces[SpaceId::Data]
2936 .for_path(path_id)
2937 .take(packet)
2938 .unwrap(); self.paths
2941 .get_mut(&path_id)
2942 .unwrap()
2943 .remove_in_flight(&info);
2944 self.path_data_mut(path_id).mtud.on_probe_lost();
2945 self.path_stats
2946 .entry(path_id)
2947 .or_default()
2948 .lost_plpmtud_probes += 1;
2949 }
2950 }
2951
2952 fn loss_time_and_space(&self, path_id: PathId) -> Option<(Instant, SpaceId)> {
2958 SpaceId::iter()
2959 .filter_map(|id| {
2960 self.spaces[id]
2961 .number_spaces
2962 .get(&path_id)
2963 .and_then(|pns| pns.loss_time)
2964 .map(|time| (time, id))
2965 })
2966 .min_by_key(|&(time, _)| time)
2967 }
2968
2969 fn pto_time_and_space(&mut self, now: Instant, path_id: PathId) -> Option<(Instant, SpaceId)> {
2971 let path = self.path(path_id)?;
2972 let pto_count = path.pto_count;
2973 let backoff = 2u32.pow(pto_count.min(MAX_BACKOFF_EXPONENT));
2974 let mut duration = path.rtt.pto_base() * backoff;
2975
2976 if path_id == PathId::ZERO
2977 && path.in_flight.ack_eliciting == 0
2978 && !self.peer_completed_address_validation(PathId::ZERO)
2979 {
2980 let space = match self.highest_space {
2986 SpaceId::Handshake => SpaceId::Handshake,
2987 _ => SpaceId::Initial,
2988 };
2989
2990 return Some((now + duration, space));
2991 }
2992
2993 let mut result = None;
2994 for space in SpaceId::iter() {
2995 let Some(pns) = self.spaces[space].number_spaces.get(&path_id) else {
2996 continue;
2997 };
2998
2999 if !pns.has_in_flight() {
3000 continue;
3001 }
3002 if space == SpaceId::Data {
3003 if self.is_handshaking() {
3005 return result;
3006 }
3007 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
3009 }
3010 let Some(last_ack_eliciting) = pns.time_of_last_ack_eliciting_packet else {
3011 continue;
3012 };
3013 let pto = last_ack_eliciting + duration;
3014 if result.is_none_or(|(earliest_pto, _)| pto < earliest_pto) {
3015 if path.anti_amplification_blocked(1) {
3016 continue;
3018 }
3019 if path.in_flight.ack_eliciting == 0 {
3020 continue;
3022 }
3023 result = Some((pto, space));
3024 }
3025 }
3026 result
3027 }
3028
3029 fn peer_completed_address_validation(&self, path: PathId) -> bool {
3030 if self.side.is_server() || self.state.is_closed() {
3032 return true;
3033 }
3034 self.spaces[SpaceId::Handshake]
3037 .path_space(PathId::ZERO)
3038 .and_then(|pns| pns.largest_acked_packet)
3039 .is_some()
3040 || self.spaces[SpaceId::Data]
3041 .path_space(path)
3042 .and_then(|pns| pns.largest_acked_packet)
3043 .is_some()
3044 || (self.spaces[SpaceId::Data].crypto.is_some()
3045 && self.spaces[SpaceId::Handshake].crypto.is_none())
3046 }
3047
3048 fn set_loss_detection_timer(&mut self, now: Instant, path_id: PathId) {
3056 if self.state.is_closed() {
3057 return;
3061 }
3062
3063 if let Some((loss_time, _)) = self.loss_time_and_space(path_id) {
3064 self.timers.set(
3066 Timer::PerPath(path_id, PathTimer::LossDetection),
3067 loss_time,
3068 self.qlog.with_time(now),
3069 );
3070 return;
3071 }
3072
3073 if let Some((timeout, _)) = self.pto_time_and_space(now, path_id) {
3076 self.timers.set(
3077 Timer::PerPath(path_id, PathTimer::LossDetection),
3078 timeout,
3079 self.qlog.with_time(now),
3080 );
3081 } else {
3082 self.timers.stop(
3083 Timer::PerPath(path_id, PathTimer::LossDetection),
3084 self.qlog.with_time(now),
3085 );
3086 }
3087 }
3088
3089 fn pto_max_path(&self, space: SpaceId) -> Duration {
3093 match space {
3094 SpaceId::Initial | SpaceId::Handshake => self.pto(space, PathId::ZERO),
3095 SpaceId::Data => self
3096 .paths
3097 .keys()
3098 .map(|path_id| self.pto(space, *path_id))
3099 .max()
3100 .expect("there should be one at least path"),
3101 }
3102 }
3103
3104 fn pto(&self, space: SpaceId, path_id: PathId) -> Duration {
3109 let max_ack_delay = match space {
3110 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
3111 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
3112 };
3113 self.path_data(path_id).rtt.pto_base() + max_ack_delay
3114 }
3115
3116 fn on_packet_authenticated(
3117 &mut self,
3118 now: Instant,
3119 space_id: SpaceId,
3120 path_id: PathId,
3121 ecn: Option<EcnCodepoint>,
3122 packet: Option<u64>,
3123 spin: bool,
3124 is_1rtt: bool,
3125 ) {
3126 self.total_authed_packets += 1;
3127 if let Some(last_allowed_receive) = self
3128 .paths
3129 .get(&path_id)
3130 .and_then(|path| path.data.last_allowed_receive)
3131 {
3132 if now > last_allowed_receive {
3133 warn!("received data on path which we abandoned more than 3 * PTO ago");
3134 if !self.state.is_closed() {
3136 self.state.move_to_closed(TransportError::NO_ERROR(
3138 "peer failed to respond with PATH_ABANDON in time",
3139 ));
3140 self.close_common();
3141 self.set_close_timer(now);
3142 self.close = true;
3143 }
3144 return;
3145 }
3146 }
3147
3148 self.reset_keep_alive(path_id, now);
3149 self.reset_idle_timeout(now, space_id, path_id);
3150 self.permit_idle_reset = true;
3151 self.receiving_ecn |= ecn.is_some();
3152 if let Some(x) = ecn {
3153 let space = &mut self.spaces[space_id];
3154 space.for_path(path_id).ecn_counters += x;
3155
3156 if x.is_ce() {
3157 space
3158 .for_path(path_id)
3159 .pending_acks
3160 .set_immediate_ack_required();
3161 }
3162 }
3163
3164 let packet = match packet {
3165 Some(x) => x,
3166 None => return,
3167 };
3168 match &self.side {
3169 ConnectionSide::Client { .. } => {
3170 if space_id == SpaceId::Handshake {
3174 if let Some(hs) = self.state.as_handshake_mut() {
3175 hs.allow_server_migration = false;
3176 }
3177 }
3178 }
3179 ConnectionSide::Server { .. } => {
3180 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake
3181 {
3182 self.discard_space(now, SpaceId::Initial);
3184 }
3185 if self.zero_rtt_crypto.is_some() && is_1rtt {
3186 self.set_key_discard_timer(now, space_id)
3188 }
3189 }
3190 }
3191 let space = self.spaces[space_id].for_path(path_id);
3192 space.pending_acks.insert_one(packet, now);
3193 if packet >= space.rx_packet.unwrap_or_default() {
3194 space.rx_packet = Some(packet);
3195 self.spin = self.side.is_client() ^ spin;
3197 }
3198 }
3199
3200 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId, path_id: PathId) {
3205 if let Some(timeout) = self.idle_timeout {
3207 if self.state.is_closed() {
3208 self.timers
3209 .stop(Timer::Conn(ConnTimer::Idle), self.qlog.with_time(now));
3210 } else {
3211 let dt = cmp::max(timeout, 3 * self.pto_max_path(space));
3212 self.timers.set(
3213 Timer::Conn(ConnTimer::Idle),
3214 now + dt,
3215 self.qlog.with_time(now),
3216 );
3217 }
3218 }
3219
3220 if let Some(timeout) = self.path_data(path_id).idle_timeout {
3222 if self.state.is_closed() {
3223 self.timers.stop(
3224 Timer::PerPath(path_id, PathTimer::PathIdle),
3225 self.qlog.with_time(now),
3226 );
3227 } else {
3228 let dt = cmp::max(timeout, 3 * self.pto(space, path_id));
3229 self.timers.set(
3230 Timer::PerPath(path_id, PathTimer::PathIdle),
3231 now + dt,
3232 self.qlog.with_time(now),
3233 );
3234 }
3235 }
3236 }
3237
3238 fn reset_keep_alive(&mut self, path_id: PathId, now: Instant) {
3240 if !self.state.is_established() {
3241 return;
3242 }
3243
3244 if let Some(interval) = self.config.keep_alive_interval {
3245 self.timers.set(
3246 Timer::Conn(ConnTimer::KeepAlive),
3247 now + interval,
3248 self.qlog.with_time(now),
3249 );
3250 }
3251
3252 if let Some(interval) = self.path_data(path_id).keep_alive {
3253 self.timers.set(
3254 Timer::PerPath(path_id, PathTimer::PathKeepAlive),
3255 now + interval,
3256 self.qlog.with_time(now),
3257 );
3258 }
3259 }
3260
3261 fn reset_cid_retirement(&mut self, now: Instant) {
3263 if let Some((_path, t)) = self.next_cid_retirement() {
3264 self.timers.set(
3265 Timer::Conn(ConnTimer::PushNewCid),
3266 t,
3267 self.qlog.with_time(now),
3268 );
3269 }
3270 }
3271
3272 fn next_cid_retirement(&self) -> Option<(PathId, Instant)> {
3274 self.local_cid_state
3275 .iter()
3276 .filter_map(|(path_id, cid_state)| cid_state.next_timeout().map(|t| (*path_id, t)))
3277 .min_by_key(|(_path_id, timeout)| *timeout)
3278 }
3279
3280 pub(crate) fn handle_first_packet(
3285 &mut self,
3286 now: Instant,
3287 remote: SocketAddr,
3288 ecn: Option<EcnCodepoint>,
3289 packet_number: u64,
3290 packet: InitialPacket,
3291 remaining: Option<BytesMut>,
3292 ) -> Result<(), ConnectionError> {
3293 let span = trace_span!("first recv");
3294 let _guard = span.enter();
3295 debug_assert!(self.side.is_server());
3296 let len = packet.header_data.len() + packet.payload.len();
3297 let path_id = PathId::ZERO;
3298 self.path_data_mut(path_id).total_recvd = len as u64;
3299
3300 if let Some(hs) = self.state.as_handshake_mut() {
3301 hs.expected_token = packet.header.token.clone();
3302 } else {
3303 unreachable!("first packet must be delivered in Handshake state");
3304 }
3305
3306 self.on_packet_authenticated(
3308 now,
3309 SpaceId::Initial,
3310 path_id,
3311 ecn,
3312 Some(packet_number),
3313 false,
3314 false,
3315 );
3316
3317 let packet: Packet = packet.into();
3318
3319 let mut qlog = QlogRecvPacket::new(len);
3320 qlog.header(&packet.header, Some(packet_number), path_id);
3321
3322 self.process_decrypted_packet(
3323 now,
3324 remote,
3325 path_id,
3326 Some(packet_number),
3327 packet,
3328 &mut qlog,
3329 )?;
3330 self.qlog.emit_packet_received(qlog, now);
3331 if let Some(data) = remaining {
3332 self.handle_coalesced(now, remote, path_id, ecn, data);
3333 }
3334
3335 self.qlog.emit_recovery_metrics(
3336 path_id,
3337 &mut self.paths.get_mut(&path_id).unwrap().data,
3338 now,
3339 );
3340
3341 Ok(())
3342 }
3343
3344 fn init_0rtt(&mut self, now: Instant) {
3345 let (header, packet) = match self.crypto.early_crypto() {
3346 Some(x) => x,
3347 None => return,
3348 };
3349 if self.side.is_client() {
3350 match self.crypto.transport_parameters() {
3351 Ok(params) => {
3352 let params = params
3353 .expect("crypto layer didn't supply transport parameters with ticket");
3354 let params = TransportParameters {
3356 initial_src_cid: None,
3357 original_dst_cid: None,
3358 preferred_address: None,
3359 retry_src_cid: None,
3360 stateless_reset_token: None,
3361 min_ack_delay: None,
3362 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
3363 max_ack_delay: TransportParameters::default().max_ack_delay,
3364 initial_max_path_id: None,
3365 ..params
3366 };
3367 self.set_peer_params(params);
3368 self.qlog.emit_peer_transport_params_restored(self, now);
3369 }
3370 Err(e) => {
3371 error!("session ticket has malformed transport parameters: {}", e);
3372 return;
3373 }
3374 }
3375 }
3376 trace!("0-RTT enabled");
3377 self.zero_rtt_enabled = true;
3378 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
3379 }
3380
3381 fn read_crypto(
3382 &mut self,
3383 space: SpaceId,
3384 crypto: &frame::Crypto,
3385 payload_len: usize,
3386 ) -> Result<(), TransportError> {
3387 let expected = if !self.state.is_handshake() {
3388 SpaceId::Data
3389 } else if self.highest_space == SpaceId::Initial {
3390 SpaceId::Initial
3391 } else {
3392 SpaceId::Handshake
3395 };
3396 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
3400
3401 let end = crypto.offset + crypto.data.len() as u64;
3402 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
3403 warn!(
3404 "received new {:?} CRYPTO data when expecting {:?}",
3405 space, expected
3406 );
3407 return Err(TransportError::PROTOCOL_VIOLATION(
3408 "new data at unexpected encryption level",
3409 ));
3410 }
3411
3412 let space = &mut self.spaces[space];
3413 let max = end.saturating_sub(space.crypto_stream.bytes_read());
3414 if max > self.config.crypto_buffer_size as u64 {
3415 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
3416 }
3417
3418 space
3419 .crypto_stream
3420 .insert(crypto.offset, crypto.data.clone(), payload_len);
3421 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
3422 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
3423 if self.crypto.read_handshake(&chunk.bytes)? {
3424 self.events.push_back(Event::HandshakeDataReady);
3425 }
3426 }
3427
3428 Ok(())
3429 }
3430
3431 fn write_crypto(&mut self) {
3432 loop {
3433 let space = self.highest_space;
3434 let mut outgoing = Vec::new();
3435 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
3436 match space {
3437 SpaceId::Initial => {
3438 self.upgrade_crypto(SpaceId::Handshake, crypto);
3439 }
3440 SpaceId::Handshake => {
3441 self.upgrade_crypto(SpaceId::Data, crypto);
3442 }
3443 _ => unreachable!("got updated secrets during 1-RTT"),
3444 }
3445 }
3446 if outgoing.is_empty() {
3447 if space == self.highest_space {
3448 break;
3449 } else {
3450 continue;
3452 }
3453 }
3454 let offset = self.spaces[space].crypto_offset;
3455 let outgoing = Bytes::from(outgoing);
3456 if let Some(hs) = self.state.as_handshake_mut() {
3457 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
3458 hs.client_hello = Some(outgoing.clone());
3459 }
3460 }
3461 self.spaces[space].crypto_offset += outgoing.len() as u64;
3462 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
3463 self.spaces[space].pending.crypto.push_back(frame::Crypto {
3464 offset,
3465 data: outgoing,
3466 });
3467 }
3468 }
3469
3470 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
3472 debug_assert!(
3473 self.spaces[space].crypto.is_none(),
3474 "already reached packet space {space:?}"
3475 );
3476 trace!("{:?} keys ready", space);
3477 if space == SpaceId::Data {
3478 self.next_crypto = Some(
3480 self.crypto
3481 .next_1rtt_keys()
3482 .expect("handshake should be complete"),
3483 );
3484 }
3485
3486 self.spaces[space].crypto = Some(crypto);
3487 debug_assert!(space as usize > self.highest_space as usize);
3488 self.highest_space = space;
3489 if space == SpaceId::Data && self.side.is_client() {
3490 self.zero_rtt_crypto = None;
3492 }
3493 }
3494
3495 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
3496 debug_assert!(space_id != SpaceId::Data);
3497 trace!("discarding {:?} keys", space_id);
3498 if space_id == SpaceId::Initial {
3499 if let ConnectionSide::Client { token, .. } = &mut self.side {
3501 *token = Bytes::new();
3502 }
3503 }
3504 let space = &mut self.spaces[space_id];
3505 space.crypto = None;
3506 let pns = space.for_path(PathId::ZERO);
3507 pns.time_of_last_ack_eliciting_packet = None;
3508 pns.loss_time = None;
3509 pns.loss_probes = 0;
3510 let sent_packets = mem::take(&mut pns.sent_packets);
3511 let path = self.paths.get_mut(&PathId::ZERO).unwrap();
3512 for (_, packet) in sent_packets.into_iter() {
3513 path.data.remove_in_flight(&packet);
3514 }
3515
3516 self.set_loss_detection_timer(now, PathId::ZERO)
3517 }
3518
3519 fn handle_coalesced(
3520 &mut self,
3521 now: Instant,
3522 remote: SocketAddr,
3523 path_id: PathId,
3524 ecn: Option<EcnCodepoint>,
3525 data: BytesMut,
3526 ) {
3527 self.path_data_mut(path_id)
3528 .inc_total_recvd(data.len() as u64);
3529 let mut remaining = Some(data);
3530 let cid_len = self
3531 .local_cid_state
3532 .values()
3533 .map(|cid_state| cid_state.cid_len())
3534 .next()
3535 .expect("one cid_state must exist");
3536 while let Some(data) = remaining {
3537 match PartialDecode::new(
3538 data,
3539 &FixedLengthConnectionIdParser::new(cid_len),
3540 &[self.version],
3541 self.endpoint_config.grease_quic_bit,
3542 ) {
3543 Ok((partial_decode, rest)) => {
3544 remaining = rest;
3545 self.handle_decode(now, remote, path_id, ecn, partial_decode);
3546 }
3547 Err(e) => {
3548 trace!("malformed header: {}", e);
3549 return;
3550 }
3551 }
3552 }
3553 }
3554
3555 fn handle_decode(
3556 &mut self,
3557 now: Instant,
3558 remote: SocketAddr,
3559 path_id: PathId,
3560 ecn: Option<EcnCodepoint>,
3561 partial_decode: PartialDecode,
3562 ) {
3563 let qlog = QlogRecvPacket::new(partial_decode.len());
3564 if let Some(decoded) = packet_crypto::unprotect_header(
3565 partial_decode,
3566 &self.spaces,
3567 self.zero_rtt_crypto.as_ref(),
3568 self.peer_params.stateless_reset_token,
3569 ) {
3570 self.handle_packet(
3571 now,
3572 remote,
3573 path_id,
3574 ecn,
3575 decoded.packet,
3576 decoded.stateless_reset,
3577 qlog,
3578 );
3579 }
3580 }
3581
3582 fn handle_packet(
3583 &mut self,
3584 now: Instant,
3585 remote: SocketAddr,
3586 path_id: PathId,
3587 ecn: Option<EcnCodepoint>,
3588 packet: Option<Packet>,
3589 stateless_reset: bool,
3590 mut qlog: QlogRecvPacket,
3591 ) {
3592 self.stats.udp_rx.ios += 1;
3593 if let Some(ref packet) = packet {
3594 trace!(
3595 "got {:?} packet ({} bytes) from {} using id {}",
3596 packet.header.space(),
3597 packet.payload.len() + packet.header_data.len(),
3598 remote,
3599 packet.header.dst_cid(),
3600 );
3601 }
3602
3603 if self.is_handshaking() {
3604 if path_id != PathId::ZERO {
3605 debug!(%remote, %path_id, "discarding multipath packet during handshake");
3606 return;
3607 }
3608 if remote != self.path_data_mut(path_id).remote {
3609 if let Some(hs) = self.state.as_handshake() {
3610 if hs.allow_server_migration {
3611 trace!(?remote, prev = ?self.path_data(path_id).remote, "server migrated to new remote");
3612 self.path_data_mut(path_id).remote = remote;
3613 self.qlog.emit_tuple_assigned(path_id, remote, now);
3614 } else {
3615 debug!("discarding packet with unexpected remote during handshake");
3616 return;
3617 }
3618 } else {
3619 debug!("discarding packet with unexpected remote during handshake");
3620 return;
3621 }
3622 }
3623 }
3624
3625 let was_closed = self.state.is_closed();
3626 let was_drained = self.state.is_drained();
3627
3628 let decrypted = match packet {
3629 None => Err(None),
3630 Some(mut packet) => self
3631 .decrypt_packet(now, path_id, &mut packet)
3632 .map(move |number| (packet, number)),
3633 };
3634 let result = match decrypted {
3635 _ if stateless_reset => {
3636 debug!("got stateless reset");
3637 Err(ConnectionError::Reset)
3638 }
3639 Err(Some(e)) => {
3640 warn!("illegal packet: {}", e);
3641 Err(e.into())
3642 }
3643 Err(None) => {
3644 debug!("failed to authenticate packet");
3645 self.authentication_failures += 1;
3646 let integrity_limit = self.spaces[self.highest_space]
3647 .crypto
3648 .as_ref()
3649 .unwrap()
3650 .packet
3651 .local
3652 .integrity_limit();
3653 if self.authentication_failures > integrity_limit {
3654 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
3655 } else {
3656 return;
3657 }
3658 }
3659 Ok((packet, number)) => {
3660 qlog.header(&packet.header, number, path_id);
3661 let span = match number {
3662 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
3663 None => trace_span!("recv", space = ?packet.header.space()),
3664 };
3665 let _guard = span.enter();
3666
3667 let dedup = self.spaces[packet.header.space()]
3668 .path_space_mut(path_id)
3669 .map(|pns| &mut pns.dedup);
3670 if number.zip(dedup).is_some_and(|(n, d)| d.insert(n)) {
3671 debug!("discarding possible duplicate packet");
3672 self.qlog.emit_packet_received(qlog, now);
3673 return;
3674 } else if self.state.is_handshake() && packet.header.is_short() {
3675 trace!("dropping short packet during handshake");
3677 self.qlog.emit_packet_received(qlog, now);
3678 return;
3679 } else {
3680 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
3681 if let Some(hs) = self.state.as_handshake() {
3682 if self.side.is_server() && token != &hs.expected_token {
3683 warn!("discarding Initial with invalid retry token");
3687 self.qlog.emit_packet_received(qlog, now);
3688 return;
3689 }
3690 }
3691 }
3692
3693 if !self.state.is_closed() {
3694 let spin = match packet.header {
3695 Header::Short { spin, .. } => spin,
3696 _ => false,
3697 };
3698
3699 if self.side().is_server() && !self.abandoned_paths.contains(&path_id) {
3700 self.ensure_path(path_id, remote, now, number);
3702 }
3703 if self.paths.contains_key(&path_id) {
3704 self.on_packet_authenticated(
3705 now,
3706 packet.header.space(),
3707 path_id,
3708 ecn,
3709 number,
3710 spin,
3711 packet.header.is_1rtt(),
3712 );
3713 }
3714 }
3715
3716 let res = self
3717 .process_decrypted_packet(now, remote, path_id, number, packet, &mut qlog);
3718
3719 self.qlog.emit_packet_received(qlog, now);
3720 res
3721 }
3722 }
3723 };
3724
3725 if let Err(conn_err) = result {
3727 match conn_err {
3728 ConnectionError::ApplicationClosed(reason) => self.state.move_to_closed(reason),
3729 ConnectionError::ConnectionClosed(reason) => self.state.move_to_closed(reason),
3730 ConnectionError::Reset
3731 | ConnectionError::TransportError(TransportError {
3732 code: TransportErrorCode::AEAD_LIMIT_REACHED,
3733 ..
3734 }) => {
3735 self.state.move_to_drained(Some(conn_err));
3736 }
3737 ConnectionError::TimedOut => {
3738 unreachable!("timeouts aren't generated by packet processing");
3739 }
3740 ConnectionError::TransportError(err) => {
3741 debug!("closing connection due to transport error: {}", err);
3742 self.state.move_to_closed(err);
3743 }
3744 ConnectionError::VersionMismatch => {
3745 self.state.move_to_draining(Some(conn_err));
3746 }
3747 ConnectionError::LocallyClosed => {
3748 unreachable!("LocallyClosed isn't generated by packet processing");
3749 }
3750 ConnectionError::CidsExhausted => {
3751 unreachable!("CidsExhausted isn't generated by packet processing");
3752 }
3753 };
3754 }
3755
3756 if !was_closed && self.state.is_closed() {
3757 self.close_common();
3758 if !self.state.is_drained() {
3759 self.set_close_timer(now);
3760 }
3761 }
3762 if !was_drained && self.state.is_drained() {
3763 self.endpoint_events.push_back(EndpointEventInner::Drained);
3764 self.timers
3767 .stop(Timer::Conn(ConnTimer::Close), self.qlog.with_time(now));
3768 }
3769
3770 if matches!(self.state.as_type(), StateType::Closed) {
3772 let path_remote = self
3776 .paths
3777 .get(&path_id)
3778 .map(|p| p.data.remote)
3779 .unwrap_or(remote);
3780 self.close = remote == path_remote;
3781 }
3782 }
3783
3784 fn process_decrypted_packet(
3785 &mut self,
3786 now: Instant,
3787 remote: SocketAddr,
3788 path_id: PathId,
3789 number: Option<u64>,
3790 packet: Packet,
3791 qlog: &mut QlogRecvPacket,
3792 ) -> Result<(), ConnectionError> {
3793 if !self.paths.contains_key(&path_id) {
3794 trace!(%path_id, ?number, "discarding packet for unknown path");
3798 return Ok(());
3799 }
3800 let state = match self.state.as_type() {
3801 StateType::Established => {
3802 match packet.header.space() {
3803 SpaceId::Data => {
3804 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?
3805 }
3806 _ if packet.header.has_frames() => {
3807 self.process_early_payload(now, path_id, packet, qlog)?
3808 }
3809 _ => {
3810 trace!("discarding unexpected pre-handshake packet");
3811 }
3812 }
3813 return Ok(());
3814 }
3815 StateType::Closed => {
3816 for result in frame::Iter::new(packet.payload.freeze())? {
3817 let frame = match result {
3818 Ok(frame) => frame,
3819 Err(err) => {
3820 debug!("frame decoding error: {err:?}");
3821 continue;
3822 }
3823 };
3824 qlog.frame(&frame);
3825
3826 if let Frame::Padding = frame {
3827 continue;
3828 };
3829
3830 self.stats.frame_rx.record(&frame);
3831
3832 if let Frame::Close(_error) = frame {
3833 trace!("draining");
3834 self.state.move_to_draining(None);
3835 break;
3836 }
3837 }
3838 return Ok(());
3839 }
3840 StateType::Draining | StateType::Drained => return Ok(()),
3841 StateType::Handshake => self.state.as_handshake_mut().expect("checked"),
3842 };
3843
3844 match packet.header {
3845 Header::Retry {
3846 src_cid: rem_cid, ..
3847 } => {
3848 debug_assert_eq!(path_id, PathId::ZERO);
3849 if self.side.is_server() {
3850 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
3851 }
3852
3853 let is_valid_retry = self
3854 .rem_cids
3855 .get(&path_id)
3856 .map(|cids| cids.active())
3857 .map(|orig_dst_cid| {
3858 self.crypto.is_valid_retry(
3859 orig_dst_cid,
3860 &packet.header_data,
3861 &packet.payload,
3862 )
3863 })
3864 .unwrap_or_default();
3865 if self.total_authed_packets > 1
3866 || packet.payload.len() <= 16 || !is_valid_retry
3868 {
3869 trace!("discarding invalid Retry");
3870 return Ok(());
3878 }
3879
3880 trace!("retrying with CID {}", rem_cid);
3881 let client_hello = state.client_hello.take().unwrap();
3882 self.retry_src_cid = Some(rem_cid);
3883 self.rem_cids
3884 .get_mut(&path_id)
3885 .expect("PathId::ZERO not yet abandoned, is_valid_retry would have been false")
3886 .update_initial_cid(rem_cid);
3887 self.rem_handshake_cid = rem_cid;
3888
3889 let space = &mut self.spaces[SpaceId::Initial];
3890 if let Some(info) = space.for_path(PathId::ZERO).take(0) {
3891 self.on_packet_acked(now, PathId::ZERO, info);
3892 };
3893
3894 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = {
3897 let mut space = PacketSpace::new(now, SpaceId::Initial, &mut self.rng);
3898 space.crypto = Some(self.crypto.initial_keys(rem_cid, self.side.side()));
3899 space.crypto_offset = client_hello.len() as u64;
3900 space.for_path(path_id).next_packet_number = self.spaces[SpaceId::Initial]
3901 .for_path(path_id)
3902 .next_packet_number;
3903 space.pending.crypto.push_back(frame::Crypto {
3904 offset: 0,
3905 data: client_hello,
3906 });
3907 space
3908 };
3909
3910 let zero_rtt = mem::take(
3912 &mut self.spaces[SpaceId::Data]
3913 .for_path(PathId::ZERO)
3914 .sent_packets,
3915 );
3916 for (_, info) in zero_rtt.into_iter() {
3917 self.paths
3918 .get_mut(&PathId::ZERO)
3919 .unwrap()
3920 .remove_in_flight(&info);
3921 self.spaces[SpaceId::Data].pending |= info.retransmits;
3922 }
3923 self.streams.retransmit_all_for_0rtt();
3924
3925 let token_len = packet.payload.len() - 16;
3926 let ConnectionSide::Client { ref mut token, .. } = self.side else {
3927 unreachable!("we already short-circuited if we're server");
3928 };
3929 *token = packet.payload.freeze().split_to(token_len);
3930
3931 self.state = State::handshake(state::Handshake {
3932 expected_token: Bytes::new(),
3933 rem_cid_set: false,
3934 client_hello: None,
3935 allow_server_migration: true,
3936 });
3937 Ok(())
3938 }
3939 Header::Long {
3940 ty: LongType::Handshake,
3941 src_cid: rem_cid,
3942 dst_cid: loc_cid,
3943 ..
3944 } => {
3945 debug_assert_eq!(path_id, PathId::ZERO);
3946 if rem_cid != self.rem_handshake_cid {
3947 debug!(
3948 "discarding packet with mismatched remote CID: {} != {}",
3949 self.rem_handshake_cid, rem_cid
3950 );
3951 return Ok(());
3952 }
3953 self.on_path_validated(path_id);
3954
3955 self.process_early_payload(now, path_id, packet, qlog)?;
3956 if self.state.is_closed() {
3957 return Ok(());
3958 }
3959
3960 if self.crypto.is_handshaking() {
3961 trace!("handshake ongoing");
3962 return Ok(());
3963 }
3964
3965 if self.side.is_client() {
3966 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
3968 TransportError::new(
3969 TransportErrorCode::crypto(0x6d),
3970 "transport parameters missing".to_owned(),
3971 )
3972 })?;
3973
3974 if self.has_0rtt() {
3975 if !self.crypto.early_data_accepted().unwrap() {
3976 debug_assert!(self.side.is_client());
3977 debug!("0-RTT rejected");
3978 self.accepted_0rtt = false;
3979 self.streams.zero_rtt_rejected();
3980
3981 self.spaces[SpaceId::Data].pending = Retransmits::default();
3983
3984 let sent_packets = mem::take(
3986 &mut self.spaces[SpaceId::Data].for_path(path_id).sent_packets,
3987 );
3988 for (_, packet) in sent_packets.into_iter() {
3989 self.paths
3990 .get_mut(&path_id)
3991 .unwrap()
3992 .remove_in_flight(&packet);
3993 }
3994 } else {
3995 self.accepted_0rtt = true;
3996 params.validate_resumption_from(&self.peer_params)?;
3997 }
3998 }
3999 if let Some(token) = params.stateless_reset_token {
4000 let remote = self.path_data(path_id).remote;
4001 self.endpoint_events
4002 .push_back(EndpointEventInner::ResetToken(path_id, remote, token));
4003 }
4004 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4005 self.issue_first_cids(now);
4006 } else {
4007 self.spaces[SpaceId::Data].pending.handshake_done = true;
4009 self.discard_space(now, SpaceId::Handshake);
4010 self.events.push_back(Event::HandshakeConfirmed);
4011 trace!("handshake confirmed");
4012 }
4013
4014 self.events.push_back(Event::Connected);
4015 self.state.move_to_established();
4016 trace!("established");
4017
4018 self.issue_first_path_cids(now);
4021 Ok(())
4022 }
4023 Header::Initial(InitialHeader {
4024 src_cid: rem_cid,
4025 dst_cid: loc_cid,
4026 ..
4027 }) => {
4028 debug_assert_eq!(path_id, PathId::ZERO);
4029 if !state.rem_cid_set {
4030 trace!("switching remote CID to {}", rem_cid);
4031 let mut state = state.clone();
4032 self.rem_cids
4033 .get_mut(&path_id)
4034 .expect("PathId::ZERO not yet abandoned")
4035 .update_initial_cid(rem_cid);
4036 self.rem_handshake_cid = rem_cid;
4037 self.orig_rem_cid = rem_cid;
4038 state.rem_cid_set = true;
4039 self.state.move_to_handshake(state);
4040 } else if rem_cid != self.rem_handshake_cid {
4041 debug!(
4042 "discarding packet with mismatched remote CID: {} != {}",
4043 self.rem_handshake_cid, rem_cid
4044 );
4045 return Ok(());
4046 }
4047
4048 let starting_space = self.highest_space;
4049 self.process_early_payload(now, path_id, packet, qlog)?;
4050
4051 if self.side.is_server()
4052 && starting_space == SpaceId::Initial
4053 && self.highest_space != SpaceId::Initial
4054 {
4055 let params = self.crypto.transport_parameters()?.ok_or_else(|| {
4056 TransportError::new(
4057 TransportErrorCode::crypto(0x6d),
4058 "transport parameters missing".to_owned(),
4059 )
4060 })?;
4061 self.handle_peer_params(params, loc_cid, rem_cid, now)?;
4062 self.issue_first_cids(now);
4063 self.init_0rtt(now);
4064 }
4065 Ok(())
4066 }
4067 Header::Long {
4068 ty: LongType::ZeroRtt,
4069 ..
4070 } => {
4071 self.process_payload(now, remote, path_id, number.unwrap(), packet, qlog)?;
4072 Ok(())
4073 }
4074 Header::VersionNegotiate { .. } => {
4075 if self.total_authed_packets > 1 {
4076 return Ok(());
4077 }
4078 let supported = packet
4079 .payload
4080 .chunks(4)
4081 .any(|x| match <[u8; 4]>::try_from(x) {
4082 Ok(version) => self.version == u32::from_be_bytes(version),
4083 Err(_) => false,
4084 });
4085 if supported {
4086 return Ok(());
4087 }
4088 debug!("remote doesn't support our version");
4089 Err(ConnectionError::VersionMismatch)
4090 }
4091 Header::Short { .. } => unreachable!(
4092 "short packets received during handshake are discarded in handle_packet"
4093 ),
4094 }
4095 }
4096
4097 fn process_early_payload(
4099 &mut self,
4100 now: Instant,
4101 path_id: PathId,
4102 packet: Packet,
4103 #[allow(unused)] qlog: &mut QlogRecvPacket,
4104 ) -> Result<(), TransportError> {
4105 debug_assert_ne!(packet.header.space(), SpaceId::Data);
4106 debug_assert_eq!(path_id, PathId::ZERO);
4107 let payload_len = packet.payload.len();
4108 let mut ack_eliciting = false;
4109 for result in frame::Iter::new(packet.payload.freeze())? {
4110 let frame = result?;
4111 qlog.frame(&frame);
4112 let span = match frame {
4113 Frame::Padding => continue,
4114 _ => Some(trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty)),
4115 };
4116
4117 self.stats.frame_rx.record(&frame);
4118
4119 let _guard = span.as_ref().map(|x| x.enter());
4120 ack_eliciting |= frame.is_ack_eliciting();
4121
4122 if frame.is_1rtt() && packet.header.space() != SpaceId::Data {
4124 return Err(TransportError::PROTOCOL_VIOLATION(
4125 "illegal frame type in handshake",
4126 ));
4127 }
4128
4129 match frame {
4130 Frame::Padding | Frame::Ping => {}
4131 Frame::Crypto(frame) => {
4132 self.read_crypto(packet.header.space(), &frame, payload_len)?;
4133 }
4134 Frame::Ack(ack) => {
4135 self.on_ack_received(now, packet.header.space(), ack)?;
4136 }
4137 Frame::PathAck(ack) => {
4138 span.as_ref()
4139 .map(|span| span.record("path", tracing::field::debug(&ack.path_id)));
4140 self.on_path_ack_received(now, packet.header.space(), ack)?;
4141 }
4142 Frame::Close(reason) => {
4143 self.state.move_to_draining(Some(reason.into()));
4144 return Ok(());
4145 }
4146 _ => {
4147 let mut err =
4148 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
4149 err.frame = Some(frame.ty());
4150 return Err(err);
4151 }
4152 }
4153 }
4154
4155 if ack_eliciting {
4156 self.spaces[packet.header.space()]
4158 .for_path(path_id)
4159 .pending_acks
4160 .set_immediate_ack_required();
4161 }
4162
4163 self.write_crypto();
4164 Ok(())
4165 }
4166
4167 fn process_payload(
4169 &mut self,
4170 now: Instant,
4171 remote: SocketAddr,
4172 path_id: PathId,
4173 number: u64,
4174 packet: Packet,
4175 #[allow(unused)] qlog: &mut QlogRecvPacket,
4176 ) -> Result<(), TransportError> {
4177 let payload = packet.payload.freeze();
4178 let mut is_probing_packet = true;
4179 let mut close = None;
4180 let payload_len = payload.len();
4181 let mut ack_eliciting = false;
4182 let mut migration_observed_addr = None;
4185 for result in frame::Iter::new(payload)? {
4186 let frame = result?;
4187 qlog.frame(&frame);
4188 let span = match frame {
4189 Frame::Padding => continue,
4190 _ => trace_span!("frame", ty = %frame.ty(), path = tracing::field::Empty),
4191 };
4192
4193 self.stats.frame_rx.record(&frame);
4194 match &frame {
4197 Frame::Crypto(f) => {
4198 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
4199 }
4200 Frame::Stream(f) => {
4201 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
4202 }
4203 Frame::Datagram(f) => {
4204 trace!(len = f.data.len(), "got datagram frame");
4205 }
4206 f => {
4207 trace!("got frame {:?}", f);
4208 }
4209 }
4210
4211 let _guard = span.enter();
4212 if packet.header.is_0rtt() {
4213 match frame {
4214 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
4215 return Err(TransportError::PROTOCOL_VIOLATION(
4216 "illegal frame type in 0-RTT",
4217 ));
4218 }
4219 _ => {
4220 if frame.is_1rtt() {
4221 return Err(TransportError::PROTOCOL_VIOLATION(
4222 "illegal frame type in 0-RTT",
4223 ));
4224 }
4225 }
4226 }
4227 }
4228 ack_eliciting |= frame.is_ack_eliciting();
4229
4230 match frame {
4232 Frame::Padding
4233 | Frame::PathChallenge(_)
4234 | Frame::PathResponse(_)
4235 | Frame::NewConnectionId(_)
4236 | Frame::ObservedAddr(_) => {}
4237 _ => {
4238 is_probing_packet = false;
4239 }
4240 }
4241
4242 match frame {
4243 Frame::Crypto(frame) => {
4244 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
4245 }
4246 Frame::Stream(frame) => {
4247 if self.streams.received(frame, payload_len)?.should_transmit() {
4248 self.spaces[SpaceId::Data].pending.max_data = true;
4249 }
4250 }
4251 Frame::Ack(ack) => {
4252 self.on_ack_received(now, SpaceId::Data, ack)?;
4253 }
4254 Frame::PathAck(ack) => {
4255 span.record("path", tracing::field::debug(&ack.path_id));
4256 self.on_path_ack_received(now, SpaceId::Data, ack)?;
4257 }
4258 Frame::Padding | Frame::Ping => {}
4259 Frame::Close(reason) => {
4260 close = Some(reason);
4261 }
4262 Frame::PathChallenge(challenge) => {
4263 let path = &mut self
4264 .path_mut(path_id)
4265 .expect("payload is processed only after the path becomes known");
4266 path.path_responses.push(number, challenge.0, remote);
4267 if remote == path.remote {
4268 match self.peer_supports_ack_frequency() {
4278 true => self.immediate_ack(path_id),
4279 false => {
4280 self.ping_path(path_id).ok();
4281 }
4282 }
4283 }
4284 }
4285 Frame::PathResponse(response) => {
4286 let path = self
4287 .paths
4288 .get_mut(&path_id)
4289 .expect("payload is processed only after the path becomes known");
4290
4291 match path.data.challenges_sent.get(&response.0) {
4292 Some(info) if info.remote == remote && path.data.remote == remote => {
4294 let sent_instant = info.sent_instant;
4295 self.timers.stop(
4297 Timer::PerPath(path_id, PathTimer::PathValidation),
4298 self.qlog.with_time(now),
4299 );
4300 self.timers.stop(
4301 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
4302 self.qlog.with_time(now),
4303 );
4304 if !path.data.validated {
4305 trace!("new path validated");
4306 }
4307 self.timers.stop(
4308 Timer::PerPath(path_id, PathTimer::PathOpen),
4309 self.qlog.with_time(now),
4310 );
4311 path.data
4313 .challenges_sent
4314 .retain(|_token, info| info.remote != remote);
4315 path.data.send_new_challenge = false;
4316 path.data.validated = true;
4317
4318 let rtt = now.saturating_duration_since(sent_instant);
4321 path.data.rtt.reset_initial_rtt(rtt);
4322
4323 self.events
4324 .push_back(Event::Path(PathEvent::Opened { id: path_id }));
4325 if !std::mem::replace(&mut path.data.open, true) {
4328 trace!("path opened");
4329 if let Some(observed) = path.data.last_observed_addr_report.as_ref()
4330 {
4331 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4332 id: path_id,
4333 addr: observed.socket_addr(),
4334 }));
4335 }
4336 }
4337 if let Some((_, ref mut prev)) = path.prev {
4338 prev.challenges_sent.clear();
4339 prev.send_new_challenge = false;
4340 }
4341 }
4342 Some(info) if info.remote == remote => {
4344 debug!("Response to off-path PathChallenge!");
4345 path.data
4346 .challenges_sent
4347 .retain(|_token, info| info.remote != remote);
4348 }
4349 Some(info) => {
4351 debug!(%response, from=%remote, expected=%info.remote, "ignoring invalid PATH_RESPONSE")
4352 }
4353 None => debug!(%response, "ignoring invalid PATH_RESPONSE"),
4355 }
4356 }
4357 Frame::MaxData(bytes) => {
4358 self.streams.received_max_data(bytes);
4359 }
4360 Frame::MaxStreamData { id, offset } => {
4361 self.streams.received_max_stream_data(id, offset)?;
4362 }
4363 Frame::MaxStreams { dir, count } => {
4364 self.streams.received_max_streams(dir, count)?;
4365 }
4366 Frame::ResetStream(frame) => {
4367 if self.streams.received_reset(frame)?.should_transmit() {
4368 self.spaces[SpaceId::Data].pending.max_data = true;
4369 }
4370 }
4371 Frame::DataBlocked { offset } => {
4372 debug!(offset, "peer claims to be blocked at connection level");
4373 }
4374 Frame::StreamDataBlocked { id, offset } => {
4375 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
4376 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
4377 return Err(TransportError::STREAM_STATE_ERROR(
4378 "STREAM_DATA_BLOCKED on send-only stream",
4379 ));
4380 }
4381 debug!(
4382 stream = %id,
4383 offset, "peer claims to be blocked at stream level"
4384 );
4385 }
4386 Frame::StreamsBlocked { dir, limit } => {
4387 if limit > MAX_STREAM_COUNT {
4388 return Err(TransportError::FRAME_ENCODING_ERROR(
4389 "unrepresentable stream limit",
4390 ));
4391 }
4392 debug!(
4393 "peer claims to be blocked opening more than {} {} streams",
4394 limit, dir
4395 );
4396 }
4397 Frame::StopSending(frame::StopSending { id, error_code }) => {
4398 if id.initiator() != self.side.side() {
4399 if id.dir() == Dir::Uni {
4400 debug!("got STOP_SENDING on recv-only {}", id);
4401 return Err(TransportError::STREAM_STATE_ERROR(
4402 "STOP_SENDING on recv-only stream",
4403 ));
4404 }
4405 } else if self.streams.is_local_unopened(id) {
4406 return Err(TransportError::STREAM_STATE_ERROR(
4407 "STOP_SENDING on unopened stream",
4408 ));
4409 }
4410 self.streams.received_stop_sending(id, error_code);
4411 }
4412 Frame::RetireConnectionId(frame::RetireConnectionId { path_id, sequence }) => {
4413 if let Some(ref path_id) = path_id {
4414 span.record("path", tracing::field::debug(&path_id));
4415 }
4416 let path_id = path_id.unwrap_or_default();
4417 match self.local_cid_state.get_mut(&path_id) {
4418 None => error!(?path_id, "RETIRE_CONNECTION_ID for unknown path"),
4419 Some(cid_state) => {
4420 let allow_more_cids = cid_state
4421 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
4422
4423 let has_path = !self.abandoned_paths.contains(&path_id);
4427 let allow_more_cids = allow_more_cids && has_path;
4428
4429 self.endpoint_events
4430 .push_back(EndpointEventInner::RetireConnectionId(
4431 now,
4432 path_id,
4433 sequence,
4434 allow_more_cids,
4435 ));
4436 }
4437 }
4438 }
4439 Frame::NewConnectionId(frame) => {
4440 let path_id = if let Some(path_id) = frame.path_id {
4441 if !self.is_multipath_negotiated() {
4442 return Err(TransportError::PROTOCOL_VIOLATION(
4443 "received PATH_NEW_CONNECTION_ID frame when multipath was not negotiated",
4444 ));
4445 }
4446 if path_id > self.local_max_path_id {
4447 return Err(TransportError::PROTOCOL_VIOLATION(
4448 "PATH_NEW_CONNECTION_ID contains path_id exceeding current max",
4449 ));
4450 }
4451 path_id
4452 } else {
4453 PathId::ZERO
4454 };
4455
4456 if self.abandoned_paths.contains(&path_id) {
4457 trace!("ignoring issued CID for abandoned path");
4458 continue;
4459 }
4460 if let Some(ref path_id) = frame.path_id {
4461 span.record("path", tracing::field::debug(&path_id));
4462 }
4463 let rem_cids = self
4464 .rem_cids
4465 .entry(path_id)
4466 .or_insert_with(|| CidQueue::new(frame.id));
4467 if rem_cids.active().is_empty() {
4468 return Err(TransportError::PROTOCOL_VIOLATION(
4470 "NEW_CONNECTION_ID when CIDs aren't in use",
4471 ));
4472 }
4473 if frame.retire_prior_to > frame.sequence {
4474 return Err(TransportError::PROTOCOL_VIOLATION(
4475 "NEW_CONNECTION_ID retiring unissued CIDs",
4476 ));
4477 }
4478
4479 use crate::cid_queue::InsertError;
4480 match rem_cids.insert(frame) {
4481 Ok(None) => {}
4482 Ok(Some((retired, reset_token))) => {
4483 let pending_retired =
4484 &mut self.spaces[SpaceId::Data].pending.retire_cids;
4485 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
4488 if (pending_retired.len() as u64)
4491 .saturating_add(retired.end.saturating_sub(retired.start))
4492 > MAX_PENDING_RETIRED_CIDS
4493 {
4494 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
4495 "queued too many retired CIDs",
4496 ));
4497 }
4498 pending_retired.extend(retired.map(|seq| (path_id, seq)));
4499 self.set_reset_token(path_id, remote, reset_token);
4500 }
4501 Err(InsertError::ExceedsLimit) => {
4502 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
4503 }
4504 Err(InsertError::Retired) => {
4505 trace!("discarding already-retired");
4506 self.spaces[SpaceId::Data]
4510 .pending
4511 .retire_cids
4512 .push((path_id, frame.sequence));
4513 continue;
4514 }
4515 };
4516
4517 if self.side.is_server()
4518 && path_id == PathId::ZERO
4519 && self
4520 .rem_cids
4521 .get(&PathId::ZERO)
4522 .map(|cids| cids.active_seq() == 0)
4523 .unwrap_or_default()
4524 {
4525 self.update_rem_cid(PathId::ZERO);
4528 }
4529 }
4530 Frame::NewToken(NewToken { token }) => {
4531 let ConnectionSide::Client {
4532 token_store,
4533 server_name,
4534 ..
4535 } = &self.side
4536 else {
4537 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
4538 };
4539 if token.is_empty() {
4540 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
4541 }
4542 trace!("got new token");
4543 token_store.insert(server_name, token);
4544 }
4545 Frame::Datagram(datagram) => {
4546 if self
4547 .datagrams
4548 .received(datagram, &self.config.datagram_receive_buffer_size)?
4549 {
4550 self.events.push_back(Event::DatagramReceived);
4551 }
4552 }
4553 Frame::AckFrequency(ack_frequency) => {
4554 if !self.ack_frequency.ack_frequency_received(&ack_frequency)? {
4557 continue;
4560 }
4561
4562 for (path_id, space) in self.spaces[SpaceId::Data].number_spaces.iter_mut() {
4564 space.pending_acks.set_ack_frequency_params(&ack_frequency);
4565
4566 if let Some(timeout) = space
4569 .pending_acks
4570 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
4571 {
4572 self.timers.set(
4573 Timer::PerPath(*path_id, PathTimer::MaxAckDelay),
4574 timeout,
4575 self.qlog.with_time(now),
4576 );
4577 }
4578 }
4579 }
4580 Frame::ImmediateAck => {
4581 for pns in self.spaces[SpaceId::Data].iter_paths_mut() {
4583 pns.pending_acks.set_immediate_ack_required();
4584 }
4585 }
4586 Frame::HandshakeDone => {
4587 if self.side.is_server() {
4588 return Err(TransportError::PROTOCOL_VIOLATION(
4589 "client sent HANDSHAKE_DONE",
4590 ));
4591 }
4592 if self.spaces[SpaceId::Handshake].crypto.is_some() {
4593 self.discard_space(now, SpaceId::Handshake);
4594 }
4595 self.events.push_back(Event::HandshakeConfirmed);
4596 trace!("handshake confirmed");
4597 }
4598 Frame::ObservedAddr(observed) => {
4599 trace!(seq_no = %observed.seq_no, ip = %observed.ip, port = observed.port);
4601 if !self
4602 .peer_params
4603 .address_discovery_role
4604 .should_report(&self.config.address_discovery_role)
4605 {
4606 return Err(TransportError::PROTOCOL_VIOLATION(
4607 "received OBSERVED_ADDRESS frame when not negotiated",
4608 ));
4609 }
4610 if packet.header.space() != SpaceId::Data {
4612 return Err(TransportError::PROTOCOL_VIOLATION(
4613 "OBSERVED_ADDRESS frame outside data space",
4614 ));
4615 }
4616
4617 let path = self.path_data_mut(path_id);
4618 if remote == path.remote {
4619 if let Some(updated) = path.update_observed_addr_report(observed) {
4620 if path.open {
4621 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4622 id: path_id,
4623 addr: updated,
4624 }));
4625 }
4626 }
4628 } else {
4629 migration_observed_addr = Some(observed)
4631 }
4632 }
4633 Frame::PathAbandon(frame::PathAbandon {
4634 path_id,
4635 error_code,
4636 }) => {
4637 span.record("path", tracing::field::debug(&path_id));
4638 let already_abandoned = match self.close_path(now, path_id, error_code.into()) {
4640 Ok(()) => {
4641 trace!("peer abandoned path");
4642 false
4643 }
4644 Err(ClosePathError::LastOpenPath) => {
4645 trace!("peer abandoned last path, closing connection");
4646 return Err(TransportError::NO_ERROR("last path abandoned by peer"));
4648 }
4649 Err(ClosePathError::ClosedPath) => {
4650 trace!("peer abandoned already closed path");
4651 true
4652 }
4653 };
4654 if self.path(path_id).is_some() && !already_abandoned {
4659 let delay = self.pto(SpaceId::Data, path_id) * 3;
4664 self.timers.set(
4665 Timer::PerPath(path_id, PathTimer::DiscardPath),
4666 now + delay,
4667 self.qlog.with_time(now),
4668 );
4669 }
4670 }
4671 Frame::PathStatusAvailable(info) => {
4672 span.record("path", tracing::field::debug(&info.path_id));
4673 if self.is_multipath_negotiated() {
4674 self.on_path_status(
4675 info.path_id,
4676 PathStatus::Available,
4677 info.status_seq_no,
4678 );
4679 } else {
4680 return Err(TransportError::PROTOCOL_VIOLATION(
4681 "received PATH_STATUS_AVAILABLE frame when multipath was not negotiated",
4682 ));
4683 }
4684 }
4685 Frame::PathStatusBackup(info) => {
4686 span.record("path", tracing::field::debug(&info.path_id));
4687 if self.is_multipath_negotiated() {
4688 self.on_path_status(info.path_id, PathStatus::Backup, info.status_seq_no);
4689 } else {
4690 return Err(TransportError::PROTOCOL_VIOLATION(
4691 "received PATH_STATUS_BACKUP frame when multipath was not negotiated",
4692 ));
4693 }
4694 }
4695 Frame::MaxPathId(frame::MaxPathId(path_id)) => {
4696 span.record("path", tracing::field::debug(&path_id));
4697 if !self.is_multipath_negotiated() {
4698 return Err(TransportError::PROTOCOL_VIOLATION(
4699 "received MAX_PATH_ID frame when multipath was not negotiated",
4700 ));
4701 }
4702 if path_id > self.remote_max_path_id {
4704 self.remote_max_path_id = path_id;
4705 self.issue_first_path_cids(now);
4706 }
4707 }
4708 Frame::PathsBlocked(frame::PathsBlocked(max_path_id)) => {
4709 if self.is_multipath_negotiated() {
4713 if self.local_max_path_id > max_path_id {
4714 return Err(TransportError::PROTOCOL_VIOLATION(
4715 "PATHS_BLOCKED maximum path identifier was larger than local maximum",
4716 ));
4717 }
4718 debug!("received PATHS_BLOCKED({:?})", max_path_id);
4719 } else {
4721 return Err(TransportError::PROTOCOL_VIOLATION(
4722 "received PATHS_BLOCKED frame when not multipath was not negotiated",
4723 ));
4724 }
4725 }
4726 Frame::PathCidsBlocked(frame::PathCidsBlocked { path_id, next_seq }) => {
4727 if self.is_multipath_negotiated() {
4735 if path_id > self.local_max_path_id {
4736 return Err(TransportError::PROTOCOL_VIOLATION(
4737 "PATH_CIDS_BLOCKED path identifier was larger than local maximum",
4738 ));
4739 }
4740 if next_seq.0
4741 > self
4742 .local_cid_state
4743 .get(&path_id)
4744 .map(|cid_state| cid_state.active_seq().1 + 1)
4745 .unwrap_or_default()
4746 {
4747 return Err(TransportError::PROTOCOL_VIOLATION(
4748 "PATH_CIDS_BLOCKED next sequence number larger than in local state",
4749 ));
4750 }
4751 debug!(%path_id, %next_seq, "received PATH_CIDS_BLOCKED");
4752 } else {
4753 return Err(TransportError::PROTOCOL_VIOLATION(
4754 "received PATH_CIDS_BLOCKED frame when not multipath was not negotiated",
4755 ));
4756 }
4757 }
4758 Frame::AddAddress(addr) => {
4759 let client_state = match self.iroh_hp.client_side_mut() {
4760 Ok(state) => state,
4761 Err(err) => {
4762 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4763 "Nat traversal(ADD_ADDRESS): {err}"
4764 )));
4765 }
4766 };
4767
4768 if !client_state.check_remote_address(&addr) {
4769 warn!(?addr, "server sent illegal ADD_ADDRESS frame");
4771 }
4772
4773 match client_state.add_remote_address(addr) {
4774 Ok(maybe_added) => {
4775 if let Some(added) = maybe_added {
4776 self.events.push_back(Event::NatTraversal(
4777 iroh_hp::Event::AddressAdded(added),
4778 ));
4779 }
4780 }
4781 Err(e) => {
4782 warn!(%e, "failed to add remote address")
4783 }
4784 }
4785 }
4786 Frame::RemoveAddress(addr) => {
4787 let client_state = match self.iroh_hp.client_side_mut() {
4788 Ok(state) => state,
4789 Err(err) => {
4790 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4791 "Nat traversal(REMOVE_ADDRESS): {err}"
4792 )));
4793 }
4794 };
4795 if let Some(removed_addr) = client_state.remove_remote_address(addr) {
4796 self.events
4797 .push_back(Event::NatTraversal(iroh_hp::Event::AddressRemoved(
4798 removed_addr,
4799 )));
4800 }
4801 }
4802 Frame::ReachOut(reach_out) => {
4803 let server_state = match self.iroh_hp.server_side_mut() {
4804 Ok(state) => state,
4805 Err(err) => {
4806 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4807 "Nat traversal(REACH_OUT): {err}"
4808 )));
4809 }
4810 };
4811
4812 if let Err(err) = server_state.handle_reach_out(reach_out) {
4813 return Err(TransportError::PROTOCOL_VIOLATION(format!(
4814 "Nat traversal(REACH_OUT): {err}"
4815 )));
4816 }
4817 }
4818 }
4819 }
4820
4821 let space = self.spaces[SpaceId::Data].for_path(path_id);
4822 if space
4823 .pending_acks
4824 .packet_received(now, number, ack_eliciting, &space.dedup)
4825 {
4826 if self.abandoned_paths.contains(&path_id) {
4827 space.pending_acks.set_immediate_ack_required();
4830 } else {
4831 self.timers.set(
4832 Timer::PerPath(path_id, PathTimer::MaxAckDelay),
4833 now + self.ack_frequency.max_ack_delay,
4834 self.qlog.with_time(now),
4835 );
4836 }
4837 }
4838
4839 let pending = &mut self.spaces[SpaceId::Data].pending;
4844 self.streams.queue_max_stream_id(pending);
4845
4846 if let Some(reason) = close {
4847 self.state.move_to_draining(Some(reason.into()));
4848 self.close = true;
4849 }
4850
4851 if Some(number) == self.spaces[SpaceId::Data].for_path(path_id).rx_packet
4852 && !is_probing_packet
4853 && remote != self.path_data(path_id).remote
4854 {
4855 let ConnectionSide::Server { ref server_config } = self.side else {
4856 panic!("packets from unknown remote should be dropped by clients");
4857 };
4858 debug_assert!(
4859 server_config.migration,
4860 "migration-initiating packets should have been dropped immediately"
4861 );
4862 self.migrate(path_id, now, remote, migration_observed_addr);
4863 self.update_rem_cid(path_id);
4865 self.spin = false;
4866 }
4867
4868 Ok(())
4869 }
4870
4871 fn migrate(
4872 &mut self,
4873 path_id: PathId,
4874 now: Instant,
4875 remote: SocketAddr,
4876 observed_addr: Option<ObservedAddr>,
4877 ) {
4878 trace!(%remote, %path_id, "migration initiated");
4879 self.path_counter = self.path_counter.wrapping_add(1);
4880 let prev_pto = self.pto(SpaceId::Data, path_id);
4887 let known_path = self.paths.get_mut(&path_id).expect("known path");
4888 let path = &mut known_path.data;
4889 let mut new_path = if remote.is_ipv4() && remote.ip() == path.remote.ip() {
4890 PathData::from_previous(remote, path, self.path_counter, now)
4891 } else {
4892 let peer_max_udp_payload_size =
4893 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
4894 .unwrap_or(u16::MAX);
4895 PathData::new(
4896 remote,
4897 self.allow_mtud,
4898 Some(peer_max_udp_payload_size),
4899 self.path_counter,
4900 now,
4901 &self.config,
4902 )
4903 };
4904 new_path.last_observed_addr_report = path.last_observed_addr_report.clone();
4905 if let Some(report) = observed_addr {
4906 if let Some(updated) = new_path.update_observed_addr_report(report) {
4907 tracing::info!("adding observed addr event from migration");
4908 self.events.push_back(Event::Path(PathEvent::ObservedAddr {
4909 id: path_id,
4910 addr: updated,
4911 }));
4912 }
4913 }
4914 new_path.send_new_challenge = true;
4915
4916 let mut prev = mem::replace(path, new_path);
4917 if !prev.is_validating_path() {
4919 prev.send_new_challenge = true;
4920 known_path.prev = Some((self.rem_cids.get(&path_id).unwrap().active(), prev));
4924 }
4925
4926 self.qlog.emit_tuple_assigned(path_id, remote, now);
4928
4929 self.timers.set(
4930 Timer::PerPath(path_id, PathTimer::PathValidation),
4931 now + 3 * cmp::max(self.pto(SpaceId::Data, path_id), prev_pto),
4932 self.qlog.with_time(now),
4933 );
4934 }
4935
4936 pub fn local_address_changed(&mut self) {
4938 self.update_rem_cid(PathId::ZERO);
4940 self.ping();
4941 }
4942
4943 fn update_rem_cid(&mut self, path_id: PathId) {
4945 let Some((reset_token, retired)) =
4946 self.rem_cids.get_mut(&path_id).and_then(|cids| cids.next())
4947 else {
4948 return;
4949 };
4950
4951 self.spaces[SpaceId::Data]
4953 .pending
4954 .retire_cids
4955 .extend(retired.map(|seq| (path_id, seq)));
4956 let remote = self.path_data(path_id).remote;
4957 self.set_reset_token(path_id, remote, reset_token);
4958 }
4959
4960 fn set_reset_token(&mut self, path_id: PathId, remote: SocketAddr, reset_token: ResetToken) {
4969 self.endpoint_events
4970 .push_back(EndpointEventInner::ResetToken(path_id, remote, reset_token));
4971
4972 if path_id == PathId::ZERO {
4978 self.peer_params.stateless_reset_token = Some(reset_token);
4979 }
4980 }
4981
4982 fn issue_first_cids(&mut self, now: Instant) {
4984 if self
4985 .local_cid_state
4986 .get(&PathId::ZERO)
4987 .expect("PathId::ZERO exists when the connection is created")
4988 .cid_len()
4989 == 0
4990 {
4991 return;
4992 }
4993
4994 let mut n = self.peer_params.issue_cids_limit() - 1;
4996 if let ConnectionSide::Server { server_config } = &self.side {
4997 if server_config.has_preferred_address() {
4998 n -= 1;
5000 }
5001 }
5002 self.endpoint_events
5003 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
5004 }
5005
5006 fn issue_first_path_cids(&mut self, now: Instant) {
5010 if let Some(max_path_id) = self.max_path_id() {
5011 let mut path_id = self.max_path_id_with_cids.next();
5012 while path_id <= max_path_id {
5013 self.endpoint_events
5014 .push_back(EndpointEventInner::NeedIdentifiers(
5015 path_id,
5016 now,
5017 self.peer_params.issue_cids_limit(),
5018 ));
5019 path_id = path_id.next();
5020 }
5021 self.max_path_id_with_cids = max_path_id;
5022 }
5023 }
5024
5025 fn populate_packet(
5033 &mut self,
5034 now: Instant,
5035 space_id: SpaceId,
5036 path_id: PathId,
5037 path_exclusive_only: bool,
5038 buf: &mut impl BufMut,
5039 pn: u64,
5040 #[allow(unused)] qlog: &mut QlogSentPacket,
5041 ) -> SentFrames {
5042 let mut sent = SentFrames::default();
5043 let is_multipath_negotiated = self.is_multipath_negotiated();
5044 let space = &mut self.spaces[space_id];
5045 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5046 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
5047 space
5048 .for_path(path_id)
5049 .pending_acks
5050 .maybe_ack_non_eliciting();
5051
5052 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
5054 trace!("HANDSHAKE_DONE");
5055 buf.write(frame::FrameType::HANDSHAKE_DONE);
5056 qlog.frame(&Frame::HandshakeDone);
5057 sent.retransmits.get_or_create().handshake_done = true;
5058 self.stats.frame_tx.handshake_done =
5060 self.stats.frame_tx.handshake_done.saturating_add(1);
5061 }
5062
5063 if let Some((round, addresses)) = space.pending.reach_out.as_mut() {
5066 while let Some(local_addr) = addresses.pop() {
5067 let reach_out = frame::ReachOut::new(*round, local_addr);
5068 if buf.remaining_mut() > reach_out.size() {
5069 trace!(%round, ?local_addr, "REACH_OUT");
5070 reach_out.write(buf);
5071 let sent_reachouts = sent
5072 .retransmits
5073 .get_or_create()
5074 .reach_out
5075 .get_or_insert_with(|| (*round, Default::default()));
5076 sent_reachouts.1.push(local_addr);
5077 self.stats.frame_tx.reach_out = self.stats.frame_tx.reach_out.saturating_add(1);
5078 qlog.frame(&Frame::ReachOut(reach_out));
5079 } else {
5080 addresses.push(local_addr);
5081 break;
5082 }
5083 }
5084 if addresses.is_empty() {
5085 space.pending.reach_out = None;
5086 }
5087 }
5088
5089 if !path_exclusive_only
5091 && space_id == SpaceId::Data
5092 && self
5093 .config
5094 .address_discovery_role
5095 .should_report(&self.peer_params.address_discovery_role)
5096 && (!path.observed_addr_sent || space.pending.observed_addr)
5097 {
5098 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5099 if buf.remaining_mut() > frame.size() {
5100 trace!(seq = %frame.seq_no, ip = %frame.ip, port = frame.port, "OBSERVED_ADDRESS");
5101 frame.write(buf);
5102
5103 self.next_observed_addr_seq_no = self.next_observed_addr_seq_no.saturating_add(1u8);
5104 path.observed_addr_sent = true;
5105
5106 self.stats.frame_tx.observed_addr += 1;
5107 sent.retransmits.get_or_create().observed_addr = true;
5108 space.pending.observed_addr = false;
5109 qlog.frame(&Frame::ObservedAddr(frame));
5110 }
5111 }
5112
5113 if mem::replace(&mut space.for_path(path_id).ping_pending, false) {
5115 trace!("PING");
5116 buf.write(frame::FrameType::PING);
5117 sent.non_retransmits = true;
5118 self.stats.frame_tx.ping += 1;
5119 qlog.frame(&Frame::Ping);
5120 }
5121
5122 if mem::replace(&mut space.for_path(path_id).immediate_ack_pending, false) {
5124 debug_assert_eq!(
5125 space_id,
5126 SpaceId::Data,
5127 "immediate acks must be sent in the data space"
5128 );
5129 trace!("IMMEDIATE_ACK");
5130 buf.write(frame::FrameType::IMMEDIATE_ACK);
5131 sent.non_retransmits = true;
5132 self.stats.frame_tx.immediate_ack += 1;
5133 qlog.frame(&Frame::ImmediateAck);
5134 }
5135
5136 if !path_exclusive_only {
5140 for path_id in space
5141 .number_spaces
5142 .iter_mut()
5143 .filter(|(_, pns)| pns.pending_acks.can_send())
5144 .map(|(&path_id, _)| path_id)
5145 .collect::<Vec<_>>()
5146 {
5147 Self::populate_acks(
5148 now,
5149 self.receiving_ecn,
5150 &mut sent,
5151 path_id,
5152 space_id,
5153 space,
5154 is_multipath_negotiated,
5155 buf,
5156 &mut self.stats,
5157 qlog,
5158 );
5159 }
5160 }
5161
5162 if !path_exclusive_only && mem::replace(&mut space.pending.ack_frequency, false) {
5164 let sequence_number = self.ack_frequency.next_sequence_number();
5165
5166 let config = self.config.ack_frequency_config.as_ref().unwrap();
5168
5169 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
5171 path.rtt.get(),
5172 config,
5173 &self.peer_params,
5174 );
5175
5176 trace!(?max_ack_delay, "ACK_FREQUENCY");
5177
5178 let frame = frame::AckFrequency {
5179 sequence: sequence_number,
5180 ack_eliciting_threshold: config.ack_eliciting_threshold,
5181 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
5182 reordering_threshold: config.reordering_threshold,
5183 };
5184 frame.encode(buf);
5185 qlog.frame(&Frame::AckFrequency(frame));
5186
5187 sent.retransmits.get_or_create().ack_frequency = true;
5188
5189 self.ack_frequency
5190 .ack_frequency_sent(path_id, pn, max_ack_delay);
5191 self.stats.frame_tx.ack_frequency += 1;
5192 }
5193
5194 if buf.remaining_mut() > frame::PathChallenge::SIZE_BOUND
5196 && space_id == SpaceId::Data
5197 && path.send_new_challenge
5198 {
5199 path.send_new_challenge = false;
5200
5201 let token = self.rng.random();
5203 let info = paths::SentChallengeInfo {
5204 sent_instant: now,
5205 remote: path.remote,
5206 };
5207 path.challenges_sent.insert(token, info);
5208 sent.non_retransmits = true;
5209 sent.requires_padding = true;
5210 let challenge = frame::PathChallenge(token);
5211 trace!(%challenge, "sending new challenge");
5212 buf.write(challenge);
5213 qlog.frame(&Frame::PathChallenge(challenge));
5214 self.stats.frame_tx.path_challenge += 1;
5215 let pto = self.ack_frequency.max_ack_delay_for_pto() + path.rtt.pto_base();
5216 self.timers.set(
5217 Timer::PerPath(path_id, PathTimer::PathChallengeLost),
5218 now + pto,
5219 self.qlog.with_time(now),
5220 );
5221
5222 if is_multipath_negotiated && !path.validated && path.send_new_challenge {
5223 space.pending.path_status.insert(path_id);
5225 }
5226
5227 if space_id == SpaceId::Data
5230 && self
5231 .config
5232 .address_discovery_role
5233 .should_report(&self.peer_params.address_discovery_role)
5234 {
5235 let frame = frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5236 if buf.remaining_mut() > frame.size() {
5237 frame.write(buf);
5238 qlog.frame(&Frame::ObservedAddr(frame));
5239
5240 self.next_observed_addr_seq_no =
5241 self.next_observed_addr_seq_no.saturating_add(1u8);
5242 path.observed_addr_sent = true;
5243
5244 self.stats.frame_tx.observed_addr += 1;
5245 sent.retransmits.get_or_create().observed_addr = true;
5246 space.pending.observed_addr = false;
5247 }
5248 }
5249 }
5250
5251 if buf.remaining_mut() > frame::PathResponse::SIZE_BOUND && space_id == SpaceId::Data {
5253 if let Some(token) = path.path_responses.pop_on_path(path.remote) {
5254 sent.non_retransmits = true;
5255 sent.requires_padding = true;
5256 let response = frame::PathResponse(token);
5257 trace!(%response, "sending response");
5258 buf.write(response);
5259 qlog.frame(&Frame::PathResponse(response));
5260 self.stats.frame_tx.path_response += 1;
5261
5262 if space_id == SpaceId::Data
5266 && self
5267 .config
5268 .address_discovery_role
5269 .should_report(&self.peer_params.address_discovery_role)
5270 {
5271 let frame =
5272 frame::ObservedAddr::new(path.remote, self.next_observed_addr_seq_no);
5273 if buf.remaining_mut() > frame.size() {
5274 frame.write(buf);
5275 qlog.frame(&Frame::ObservedAddr(frame));
5276
5277 self.next_observed_addr_seq_no =
5278 self.next_observed_addr_seq_no.saturating_add(1u8);
5279 path.observed_addr_sent = true;
5280
5281 self.stats.frame_tx.observed_addr += 1;
5282 sent.retransmits.get_or_create().observed_addr = true;
5283 space.pending.observed_addr = false;
5284 }
5285 }
5286 }
5287 }
5288
5289 while !path_exclusive_only && buf.remaining_mut() > frame::Crypto::SIZE_BOUND && !is_0rtt {
5291 let mut frame = match space.pending.crypto.pop_front() {
5292 Some(x) => x,
5293 None => break,
5294 };
5295
5296 let max_crypto_data_size = buf.remaining_mut()
5301 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
5303 - 2; let len = frame
5306 .data
5307 .len()
5308 .min(2usize.pow(14) - 1)
5309 .min(max_crypto_data_size);
5310
5311 let data = frame.data.split_to(len);
5312 let truncated = frame::Crypto {
5313 offset: frame.offset,
5314 data,
5315 };
5316 trace!(
5317 "CRYPTO: off {} len {}",
5318 truncated.offset,
5319 truncated.data.len()
5320 );
5321 truncated.encode(buf);
5322 self.stats.frame_tx.crypto += 1;
5323
5324 #[cfg(feature = "qlog")]
5326 qlog.frame(&Frame::Crypto(truncated.clone()));
5327 sent.retransmits.get_or_create().crypto.push_back(truncated);
5328 if !frame.data.is_empty() {
5329 frame.offset += len as u64;
5330 space.pending.crypto.push_front(frame);
5331 }
5332 }
5333
5334 while !path_exclusive_only
5337 && space_id == SpaceId::Data
5338 && frame::PathAbandon::SIZE_BOUND <= buf.remaining_mut()
5339 {
5340 let Some((path_id, error_code)) = space.pending.path_abandon.pop_first() else {
5341 break;
5342 };
5343 let frame = frame::PathAbandon {
5344 path_id,
5345 error_code,
5346 };
5347 frame.encode(buf);
5348 qlog.frame(&Frame::PathAbandon(frame));
5349 self.stats.frame_tx.path_abandon += 1;
5350 trace!(%path_id, "PATH_ABANDON");
5351 sent.retransmits
5352 .get_or_create()
5353 .path_abandon
5354 .entry(path_id)
5355 .or_insert(error_code);
5356 }
5357
5358 while !path_exclusive_only
5360 && space_id == SpaceId::Data
5361 && frame::PathStatusAvailable::SIZE_BOUND <= buf.remaining_mut()
5362 {
5363 let Some(path_id) = space.pending.path_status.pop_first() else {
5364 break;
5365 };
5366 let Some(path) = self.paths.get(&path_id).map(|path_state| &path_state.data) else {
5367 trace!(%path_id, "discarding queued path status for unknown path");
5368 continue;
5369 };
5370
5371 let seq = path.status.seq();
5372 sent.retransmits.get_or_create().path_status.insert(path_id);
5373 match path.local_status() {
5374 PathStatus::Available => {
5375 let frame = frame::PathStatusAvailable {
5376 path_id,
5377 status_seq_no: seq,
5378 };
5379 frame.encode(buf);
5380 qlog.frame(&Frame::PathStatusAvailable(frame));
5381 self.stats.frame_tx.path_status_available += 1;
5382 trace!(%path_id, %seq, "PATH_STATUS_AVAILABLE")
5383 }
5384 PathStatus::Backup => {
5385 let frame = frame::PathStatusBackup {
5386 path_id,
5387 status_seq_no: seq,
5388 };
5389 frame.encode(buf);
5390 qlog.frame(&Frame::PathStatusBackup(frame));
5391 self.stats.frame_tx.path_status_backup += 1;
5392 trace!(%path_id, %seq, "PATH_STATUS_BACKUP")
5393 }
5394 }
5395 }
5396
5397 if space_id == SpaceId::Data
5399 && space.pending.max_path_id
5400 && frame::MaxPathId::SIZE_BOUND <= buf.remaining_mut()
5401 {
5402 let frame = frame::MaxPathId(self.local_max_path_id);
5403 frame.encode(buf);
5404 qlog.frame(&Frame::MaxPathId(frame));
5405 space.pending.max_path_id = false;
5406 sent.retransmits.get_or_create().max_path_id = true;
5407 trace!(val = %self.local_max_path_id, "MAX_PATH_ID");
5408 self.stats.frame_tx.max_path_id += 1;
5409 }
5410
5411 if space_id == SpaceId::Data
5413 && space.pending.paths_blocked
5414 && frame::PathsBlocked::SIZE_BOUND <= buf.remaining_mut()
5415 {
5416 let frame = frame::PathsBlocked(self.remote_max_path_id);
5417 frame.encode(buf);
5418 qlog.frame(&Frame::PathsBlocked(frame));
5419 space.pending.paths_blocked = false;
5420 sent.retransmits.get_or_create().paths_blocked = true;
5421 trace!(max_path_id = ?self.remote_max_path_id, "PATHS_BLOCKED");
5422 self.stats.frame_tx.paths_blocked += 1;
5423 }
5424
5425 while space_id == SpaceId::Data && frame::PathCidsBlocked::SIZE_BOUND <= buf.remaining_mut()
5427 {
5428 let Some(path_id) = space.pending.path_cids_blocked.pop() else {
5429 break;
5430 };
5431 let next_seq = match self.rem_cids.get(&path_id) {
5432 Some(cid_queue) => cid_queue.active_seq() + 1,
5433 None => 0,
5434 };
5435 let frame = frame::PathCidsBlocked {
5436 path_id,
5437 next_seq: VarInt(next_seq),
5438 };
5439 frame.encode(buf);
5440 qlog.frame(&Frame::PathCidsBlocked(frame));
5441 sent.retransmits
5442 .get_or_create()
5443 .path_cids_blocked
5444 .push(path_id);
5445 trace!(%path_id, next_seq, "PATH_CIDS_BLOCKED");
5446 self.stats.frame_tx.path_cids_blocked += 1;
5447 }
5448
5449 if space_id == SpaceId::Data {
5451 self.streams.write_control_frames(
5452 buf,
5453 &mut space.pending,
5454 &mut sent.retransmits,
5455 &mut self.stats.frame_tx,
5456 qlog,
5457 );
5458 }
5459
5460 let cid_len = self
5462 .local_cid_state
5463 .values()
5464 .map(|cid_state| cid_state.cid_len())
5465 .max()
5466 .expect("some local CID state must exist");
5467 let new_cid_size_bound =
5468 frame::NewConnectionId::size_bound(is_multipath_negotiated, cid_len);
5469 while !path_exclusive_only && buf.remaining_mut() > new_cid_size_bound {
5470 let issued = match space.pending.new_cids.pop() {
5471 Some(x) => x,
5472 None => break,
5473 };
5474 let retire_prior_to = self
5475 .local_cid_state
5476 .get(&issued.path_id)
5477 .map(|cid_state| cid_state.retire_prior_to())
5478 .unwrap_or_else(|| panic!("missing local CID state for path={}", issued.path_id));
5479
5480 let cid_path_id = match is_multipath_negotiated {
5481 true => {
5482 trace!(
5483 path_id = ?issued.path_id,
5484 sequence = issued.sequence,
5485 id = %issued.id,
5486 "PATH_NEW_CONNECTION_ID",
5487 );
5488 self.stats.frame_tx.path_new_connection_id += 1;
5489 Some(issued.path_id)
5490 }
5491 false => {
5492 trace!(
5493 sequence = issued.sequence,
5494 id = %issued.id,
5495 "NEW_CONNECTION_ID"
5496 );
5497 debug_assert_eq!(issued.path_id, PathId::ZERO);
5498 self.stats.frame_tx.new_connection_id += 1;
5499 None
5500 }
5501 };
5502 let frame = frame::NewConnectionId {
5503 path_id: cid_path_id,
5504 sequence: issued.sequence,
5505 retire_prior_to,
5506 id: issued.id,
5507 reset_token: issued.reset_token,
5508 };
5509 frame.encode(buf);
5510 sent.retransmits.get_or_create().new_cids.push(issued);
5511 qlog.frame(&Frame::NewConnectionId(frame));
5512 }
5513
5514 let retire_cid_bound = frame::RetireConnectionId::size_bound(is_multipath_negotiated);
5516 while !path_exclusive_only && buf.remaining_mut() > retire_cid_bound {
5517 let (path_id, sequence) = match space.pending.retire_cids.pop() {
5518 Some((PathId::ZERO, seq)) if !is_multipath_negotiated => {
5519 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
5520 self.stats.frame_tx.retire_connection_id += 1;
5521 (None, seq)
5522 }
5523 Some((path_id, seq)) => {
5524 trace!(%path_id, sequence = seq, "PATH_RETIRE_CONNECTION_ID");
5525 self.stats.frame_tx.path_retire_connection_id += 1;
5526 (Some(path_id), seq)
5527 }
5528 None => break,
5529 };
5530 let frame = frame::RetireConnectionId { path_id, sequence };
5531 frame.encode(buf);
5532 qlog.frame(&Frame::RetireConnectionId(frame));
5533 sent.retransmits
5534 .get_or_create()
5535 .retire_cids
5536 .push((path_id.unwrap_or_default(), sequence));
5537 }
5538
5539 let mut sent_datagrams = false;
5541 while !path_exclusive_only
5542 && buf.remaining_mut() > Datagram::SIZE_BOUND
5543 && space_id == SpaceId::Data
5544 {
5545 let prev_remaining = buf.remaining_mut();
5546 match self.datagrams.write(buf) {
5547 true => {
5548 sent_datagrams = true;
5549 sent.non_retransmits = true;
5550 self.stats.frame_tx.datagram += 1;
5551 qlog.frame_datagram((prev_remaining - buf.remaining_mut()) as u64);
5552 }
5553 false => break,
5554 }
5555 }
5556 if self.datagrams.send_blocked && sent_datagrams {
5557 self.events.push_back(Event::DatagramsUnblocked);
5558 self.datagrams.send_blocked = false;
5559 }
5560
5561 let path = &mut self.paths.get_mut(&path_id).expect("known path").data;
5562
5563 while let Some(remote_addr) = space.pending.new_tokens.pop() {
5565 if path_exclusive_only {
5566 break;
5567 }
5568 debug_assert_eq!(space_id, SpaceId::Data);
5569 let ConnectionSide::Server { server_config } = &self.side else {
5570 panic!("NEW_TOKEN frames should not be enqueued by clients");
5571 };
5572
5573 if remote_addr != path.remote {
5574 continue;
5579 }
5580
5581 let token = Token::new(
5582 TokenPayload::Validation {
5583 ip: remote_addr.ip(),
5584 issued: server_config.time_source.now(),
5585 },
5586 &mut self.rng,
5587 );
5588 let new_token = NewToken {
5589 token: token.encode(&*server_config.token_key).into(),
5590 };
5591
5592 if buf.remaining_mut() < new_token.size() {
5593 space.pending.new_tokens.push(remote_addr);
5594 break;
5595 }
5596
5597 trace!("NEW_TOKEN");
5598 new_token.encode(buf);
5599 qlog.frame(&Frame::NewToken(new_token));
5600 sent.retransmits
5601 .get_or_create()
5602 .new_tokens
5603 .push(remote_addr);
5604 self.stats.frame_tx.new_token += 1;
5605 }
5606
5607 if !path_exclusive_only && space_id == SpaceId::Data {
5609 sent.stream_frames =
5610 self.streams
5611 .write_stream_frames(buf, self.config.send_fairness, qlog);
5612 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
5613 }
5614
5615 while space_id == SpaceId::Data && frame::AddAddress::SIZE_BOUND <= buf.remaining_mut() {
5618 if let Some(added_address) = space.pending.add_address.pop_last() {
5619 trace!(
5620 seq = %added_address.seq_no,
5621 ip = ?added_address.ip,
5622 port = added_address.port,
5623 "ADD_ADDRESS",
5624 );
5625 added_address.write(buf);
5626 sent.retransmits
5627 .get_or_create()
5628 .add_address
5629 .insert(added_address);
5630 self.stats.frame_tx.add_address = self.stats.frame_tx.add_address.saturating_add(1);
5631 qlog.frame(&Frame::AddAddress(added_address));
5632 } else {
5633 break;
5634 }
5635 }
5636
5637 while space_id == SpaceId::Data && frame::RemoveAddress::SIZE_BOUND <= buf.remaining_mut() {
5639 if let Some(removed_address) = space.pending.remove_address.pop_last() {
5640 trace!(seq = %removed_address.seq_no, "REMOVE_ADDRESS");
5641 removed_address.write(buf);
5642 sent.retransmits
5643 .get_or_create()
5644 .remove_address
5645 .insert(removed_address);
5646 self.stats.frame_tx.remove_address =
5647 self.stats.frame_tx.remove_address.saturating_add(1);
5648 qlog.frame(&Frame::RemoveAddress(removed_address));
5649 } else {
5650 break;
5651 }
5652 }
5653
5654 sent
5655 }
5656
5657 fn populate_acks(
5659 now: Instant,
5660 receiving_ecn: bool,
5661 sent: &mut SentFrames,
5662 path_id: PathId,
5663 space_id: SpaceId,
5664 space: &mut PacketSpace,
5665 is_multipath_negotiated: bool,
5666 buf: &mut impl BufMut,
5667 stats: &mut ConnectionStats,
5668 #[allow(unused)] qlog: &mut QlogSentPacket,
5669 ) {
5670 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
5672
5673 debug_assert!(
5674 is_multipath_negotiated || path_id == PathId::ZERO,
5675 "Only PathId::ZERO allowed without multipath (have {path_id:?})"
5676 );
5677 if is_multipath_negotiated {
5678 debug_assert!(
5679 space_id == SpaceId::Data || path_id == PathId::ZERO,
5680 "path acks must be sent in 1RTT space (have {space_id:?})"
5681 );
5682 }
5683
5684 let pns = space.for_path(path_id);
5685 let ranges = pns.pending_acks.ranges();
5686 debug_assert!(!ranges.is_empty(), "can not send empty ACK range");
5687 let ecn = if receiving_ecn {
5688 Some(&pns.ecn_counters)
5689 } else {
5690 None
5691 };
5692 if let Some(max) = ranges.max() {
5693 sent.largest_acked.insert(path_id, max);
5694 }
5695
5696 let delay_micros = pns.pending_acks.ack_delay(now).as_micros() as u64;
5697 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
5699 let delay = delay_micros >> ack_delay_exp.into_inner();
5700
5701 if is_multipath_negotiated && space_id == SpaceId::Data {
5702 if !ranges.is_empty() {
5703 trace!("PATH_ACK {path_id:?} {ranges:?}, Delay = {delay_micros}us");
5704 frame::PathAck::encode(path_id, delay as _, ranges, ecn, buf);
5705 qlog.frame_path_ack(path_id, delay as _, ranges, ecn);
5706 stats.frame_tx.path_acks += 1;
5707 }
5708 } else {
5709 trace!("ACK {ranges:?}, Delay = {delay_micros}us");
5710 frame::Ack::encode(delay as _, ranges, ecn, buf);
5711 stats.frame_tx.acks += 1;
5712 qlog.frame_ack(delay, ranges, ecn);
5713 }
5714 }
5715
5716 fn close_common(&mut self) {
5717 trace!("connection closed");
5718 self.timers.reset();
5719 }
5720
5721 fn set_close_timer(&mut self, now: Instant) {
5722 self.timers.set(
5725 Timer::Conn(ConnTimer::Close),
5726 now + 3 * self.pto_max_path(self.highest_space),
5727 self.qlog.with_time(now),
5728 );
5729 }
5730
5731 fn handle_peer_params(
5736 &mut self,
5737 params: TransportParameters,
5738 loc_cid: ConnectionId,
5739 rem_cid: ConnectionId,
5740 now: Instant,
5741 ) -> Result<(), TransportError> {
5742 if Some(self.orig_rem_cid) != params.initial_src_cid
5743 || (self.side.is_client()
5744 && (Some(self.initial_dst_cid) != params.original_dst_cid
5745 || self.retry_src_cid != params.retry_src_cid))
5746 {
5747 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
5748 "CID authentication failure",
5749 ));
5750 }
5751 if params.initial_max_path_id.is_some() && (loc_cid.is_empty() || rem_cid.is_empty()) {
5752 return Err(TransportError::PROTOCOL_VIOLATION(
5753 "multipath must not use zero-length CIDs",
5754 ));
5755 }
5756
5757 self.set_peer_params(params);
5758 self.qlog.emit_peer_transport_params_received(self, now);
5759
5760 Ok(())
5761 }
5762
5763 fn set_peer_params(&mut self, params: TransportParameters) {
5764 self.streams.set_params(¶ms);
5765 self.idle_timeout =
5766 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
5767 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
5768
5769 if let Some(ref info) = params.preferred_address {
5770 self.rem_cids.get_mut(&PathId::ZERO).expect("not yet abandoned").insert(frame::NewConnectionId {
5772 path_id: None,
5773 sequence: 1,
5774 id: info.connection_id,
5775 reset_token: info.stateless_reset_token,
5776 retire_prior_to: 0,
5777 })
5778 .expect(
5779 "preferred address CID is the first received, and hence is guaranteed to be legal",
5780 );
5781 let remote = self.path_data(PathId::ZERO).remote;
5782 self.set_reset_token(PathId::ZERO, remote, info.stateless_reset_token);
5783 }
5784 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
5785
5786 let mut multipath_enabled = None;
5787 if let (Some(local_max_path_id), Some(remote_max_path_id)) = (
5788 self.config.get_initial_max_path_id(),
5789 params.initial_max_path_id,
5790 ) {
5791 self.local_max_path_id = local_max_path_id;
5793 self.remote_max_path_id = remote_max_path_id;
5794 let initial_max_path_id = local_max_path_id.min(remote_max_path_id);
5795 debug!(%initial_max_path_id, "multipath negotiated");
5796 multipath_enabled = Some(initial_max_path_id);
5797 }
5798
5799 if let Some((max_locally_allowed_remote_addresses, max_remotely_allowed_remote_addresses)) =
5800 self.config
5801 .max_remote_nat_traversal_addresses
5802 .zip(params.max_remote_nat_traversal_addresses)
5803 {
5804 if let Some(max_initial_paths) =
5805 multipath_enabled.map(|path_id| path_id.saturating_add(1u8))
5806 {
5807 let max_local_addresses = max_remotely_allowed_remote_addresses.get();
5808 let max_remote_addresses = max_locally_allowed_remote_addresses.get();
5809 self.iroh_hp =
5810 iroh_hp::State::new(max_remote_addresses, max_local_addresses, self.side());
5811 debug!(
5812 %max_remote_addresses, %max_local_addresses,
5813 "iroh hole punching negotiated"
5814 );
5815
5816 match self.side() {
5817 Side::Client => {
5818 if max_initial_paths.as_u32() < max_remote_addresses as u32 + 1 {
5819 warn!(%max_initial_paths, %max_remote_addresses, "local client configuration might cause nat traversal issues")
5822 } else if max_local_addresses as u64
5823 > params.active_connection_id_limit.into_inner()
5824 {
5825 warn!(%max_local_addresses, remote_cid_limit=%params.active_connection_id_limit.into_inner(), "remote server configuration might cause nat traversal issues")
5829 }
5830 }
5831 Side::Server => {
5832 if (max_initial_paths.as_u32() as u64) < crate::LOC_CID_COUNT {
5833 warn!(%max_initial_paths, local_cid_limit=%crate::LOC_CID_COUNT, "local server configuration might cause nat traversal issues")
5834 }
5835 }
5836 }
5837 } else {
5838 debug!("iroh nat traversal enabled for both endpoints, but multipath is missing")
5839 }
5840 }
5841
5842 self.peer_params = params;
5843 let peer_max_udp_payload_size =
5844 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX);
5845 self.path_data_mut(PathId::ZERO)
5846 .mtud
5847 .on_peer_max_udp_payload_size_received(peer_max_udp_payload_size);
5848 }
5849
5850 fn decrypt_packet(
5852 &mut self,
5853 now: Instant,
5854 path_id: PathId,
5855 packet: &mut Packet,
5856 ) -> Result<Option<u64>, Option<TransportError>> {
5857 let result = packet_crypto::decrypt_packet_body(
5858 packet,
5859 path_id,
5860 &self.spaces,
5861 self.zero_rtt_crypto.as_ref(),
5862 self.key_phase,
5863 self.prev_crypto.as_ref(),
5864 self.next_crypto.as_ref(),
5865 )?;
5866
5867 let result = match result {
5868 Some(r) => r,
5869 None => return Ok(None),
5870 };
5871
5872 if result.outgoing_key_update_acked {
5873 if let Some(prev) = self.prev_crypto.as_mut() {
5874 prev.end_packet = Some((result.number, now));
5875 self.set_key_discard_timer(now, packet.header.space());
5876 }
5877 }
5878
5879 if result.incoming_key_update {
5880 trace!("key update authenticated");
5881 self.update_keys(Some((result.number, now)), true);
5882 self.set_key_discard_timer(now, packet.header.space());
5883 }
5884
5885 Ok(Some(result.number))
5886 }
5887
5888 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
5889 trace!("executing key update");
5890 let new = self
5894 .crypto
5895 .next_1rtt_keys()
5896 .expect("only called for `Data` packets");
5897 self.key_phase_size = new
5898 .local
5899 .confidentiality_limit()
5900 .saturating_sub(KEY_UPDATE_MARGIN);
5901 let old = mem::replace(
5902 &mut self.spaces[SpaceId::Data]
5903 .crypto
5904 .as_mut()
5905 .unwrap() .packet,
5907 mem::replace(self.next_crypto.as_mut().unwrap(), new),
5908 );
5909 self.spaces[SpaceId::Data]
5910 .iter_paths_mut()
5911 .for_each(|s| s.sent_with_keys = 0);
5912 self.prev_crypto = Some(PrevCrypto {
5913 crypto: old,
5914 end_packet,
5915 update_unacked: remote,
5916 });
5917 self.key_phase = !self.key_phase;
5918 }
5919
5920 fn peer_supports_ack_frequency(&self) -> bool {
5921 self.peer_params.min_ack_delay.is_some()
5922 }
5923
5924 pub(crate) fn immediate_ack(&mut self, path_id: PathId) {
5929 debug_assert_eq!(
5930 self.highest_space,
5931 SpaceId::Data,
5932 "immediate ack must be written in the data space"
5933 );
5934 self.spaces[self.highest_space]
5935 .for_path(path_id)
5936 .immediate_ack_pending = true;
5937 }
5938
5939 #[cfg(test)]
5941 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
5942 let (path_id, first_decode, remaining) = match &event.0 {
5943 ConnectionEventInner::Datagram(DatagramConnectionEvent {
5944 path_id,
5945 first_decode,
5946 remaining,
5947 ..
5948 }) => (path_id, first_decode, remaining),
5949 _ => return None,
5950 };
5951
5952 if remaining.is_some() {
5953 panic!("Packets should never be coalesced in tests");
5954 }
5955
5956 let decrypted_header = packet_crypto::unprotect_header(
5957 first_decode.clone(),
5958 &self.spaces,
5959 self.zero_rtt_crypto.as_ref(),
5960 self.peer_params.stateless_reset_token,
5961 )?;
5962
5963 let mut packet = decrypted_header.packet?;
5964 packet_crypto::decrypt_packet_body(
5965 &mut packet,
5966 *path_id,
5967 &self.spaces,
5968 self.zero_rtt_crypto.as_ref(),
5969 self.key_phase,
5970 self.prev_crypto.as_ref(),
5971 self.next_crypto.as_ref(),
5972 )
5973 .ok()?;
5974
5975 Some(packet.payload.to_vec())
5976 }
5977
5978 #[cfg(test)]
5981 pub(crate) fn bytes_in_flight(&self) -> u64 {
5982 self.path_data(PathId::ZERO).in_flight.bytes
5984 }
5985
5986 #[cfg(test)]
5988 pub(crate) fn congestion_window(&self) -> u64 {
5989 let path = self.path_data(PathId::ZERO);
5990 path.congestion
5991 .window()
5992 .saturating_sub(path.in_flight.bytes)
5993 }
5994
5995 #[cfg(test)]
5997 pub(crate) fn is_idle(&self) -> bool {
5998 let current_timers = self.timers.values();
5999 current_timers
6000 .into_iter()
6001 .filter(|(timer, _)| {
6002 !matches!(
6003 timer,
6004 Timer::Conn(ConnTimer::KeepAlive)
6005 | Timer::PerPath(_, PathTimer::PathKeepAlive)
6006 | Timer::Conn(ConnTimer::PushNewCid)
6007 | Timer::Conn(ConnTimer::KeyDiscard)
6008 )
6009 })
6010 .min_by_key(|(_, time)| *time)
6011 .is_none_or(|(timer, _)| timer == Timer::Conn(ConnTimer::Idle))
6012 }
6013
6014 #[cfg(test)]
6016 pub(crate) fn using_ecn(&self) -> bool {
6017 self.path_data(PathId::ZERO).sending_ecn
6018 }
6019
6020 #[cfg(test)]
6022 pub(crate) fn total_recvd(&self) -> u64 {
6023 self.path_data(PathId::ZERO).total_recvd
6024 }
6025
6026 #[cfg(test)]
6027 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
6028 self.local_cid_state
6029 .get(&PathId::ZERO)
6030 .unwrap()
6031 .active_seq()
6032 }
6033
6034 #[cfg(test)]
6035 #[track_caller]
6036 pub(crate) fn active_local_path_cid_seq(&self, path_id: u32) -> (u64, u64) {
6037 self.local_cid_state
6038 .get(&PathId(path_id))
6039 .unwrap()
6040 .active_seq()
6041 }
6042
6043 #[cfg(test)]
6046 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
6047 let n = self
6048 .local_cid_state
6049 .get_mut(&PathId::ZERO)
6050 .unwrap()
6051 .assign_retire_seq(v);
6052 self.endpoint_events
6053 .push_back(EndpointEventInner::NeedIdentifiers(PathId::ZERO, now, n));
6054 }
6055
6056 #[cfg(test)]
6058 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
6059 self.rem_cids.get(&PathId::ZERO).unwrap().active_seq()
6060 }
6061
6062 #[cfg(test)]
6064 pub(crate) fn path_mtu(&self, path_id: PathId) -> u16 {
6065 self.path_data(path_id).current_mtu()
6066 }
6067
6068 #[cfg(test)]
6070 pub(crate) fn trigger_path_validation(&mut self) {
6071 for path in self.paths.values_mut() {
6072 path.data.send_new_challenge = true;
6073 }
6074 }
6075
6076 fn can_send_1rtt(&self, path_id: PathId, max_size: usize) -> SendableFrames {
6087 let path_exclusive = self.paths.get(&path_id).is_some_and(|path| {
6088 path.data.send_new_challenge
6089 || path
6090 .prev
6091 .as_ref()
6092 .is_some_and(|(_, path)| path.send_new_challenge)
6093 || !path.data.path_responses.is_empty()
6094 });
6095 let other = self.streams.can_send_stream_data()
6096 || self
6097 .datagrams
6098 .outgoing
6099 .front()
6100 .is_some_and(|x| x.size(true) <= max_size);
6101 SendableFrames {
6102 acks: false,
6103 other,
6104 close: false,
6105 path_exclusive,
6106 }
6107 }
6108
6109 fn kill(&mut self, reason: ConnectionError) {
6111 self.close_common();
6112 self.state.move_to_drained(Some(reason));
6113 self.endpoint_events.push_back(EndpointEventInner::Drained);
6114 }
6115
6116 pub fn current_mtu(&self) -> u16 {
6123 self.paths
6124 .iter()
6125 .filter(|&(path_id, _path_state)| !self.abandoned_paths.contains(path_id))
6126 .map(|(_path_id, path_state)| path_state.data.current_mtu())
6127 .min()
6128 .expect("There is always at least one available path")
6129 }
6130
6131 fn predict_1rtt_overhead(&mut self, pn: u64, path: PathId) -> usize {
6138 let pn_len = PacketNumber::new(
6139 pn,
6140 self.spaces[SpaceId::Data]
6141 .for_path(path)
6142 .largest_acked_packet
6143 .unwrap_or(0),
6144 )
6145 .len();
6146
6147 1 + self
6149 .rem_cids
6150 .get(&path)
6151 .map(|cids| cids.active().len())
6152 .unwrap_or(20) + pn_len
6154 + self.tag_len_1rtt()
6155 }
6156
6157 fn predict_1rtt_overhead_no_pn(&self) -> usize {
6158 let pn_len = 4;
6159
6160 let cid_len = self
6161 .rem_cids
6162 .values()
6163 .map(|cids| cids.active().len())
6164 .max()
6165 .unwrap_or(20); 1 + cid_len + pn_len + self.tag_len_1rtt()
6169 }
6170
6171 fn tag_len_1rtt(&self) -> usize {
6172 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
6173 Some(crypto) => Some(&*crypto.packet.local),
6174 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
6175 };
6176 key.map_or(16, |x| x.tag_len())
6180 }
6181
6182 fn on_path_validated(&mut self, path_id: PathId) {
6184 self.path_data_mut(path_id).validated = true;
6185 let ConnectionSide::Server { server_config } = &self.side else {
6186 return;
6187 };
6188 let remote_addr = self.path_data(path_id).remote;
6189 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
6190 new_tokens.clear();
6191 for _ in 0..server_config.validation_token.sent {
6192 new_tokens.push(remote_addr);
6193 }
6194 }
6195
6196 fn on_path_status(&mut self, path_id: PathId, status: PathStatus, status_seq_no: VarInt) {
6198 if let Some(path) = self.paths.get_mut(&path_id) {
6199 path.data.status.remote_update(status, status_seq_no);
6200 } else {
6201 debug!("PATH_STATUS_AVAILABLE received unknown path {:?}", path_id);
6202 }
6203 self.events.push_back(
6204 PathEvent::RemoteStatus {
6205 id: path_id,
6206 status,
6207 }
6208 .into(),
6209 );
6210 }
6211
6212 fn max_path_id(&self) -> Option<PathId> {
6221 if self.is_multipath_negotiated() {
6222 Some(self.remote_max_path_id.min(self.local_max_path_id))
6223 } else {
6224 None
6225 }
6226 }
6227
6228 pub fn add_nat_traversal_address(&mut self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
6230 if let Some(added) = self.iroh_hp.add_local_address(address)? {
6231 self.spaces[SpaceId::Data].pending.add_address.insert(added);
6232 };
6233 Ok(())
6234 }
6235
6236 pub fn remove_nat_traversal_address(
6240 &mut self,
6241 address: SocketAddr,
6242 ) -> Result<(), iroh_hp::Error> {
6243 if let Some(removed) = self.iroh_hp.remove_local_address(address)? {
6244 self.spaces[SpaceId::Data]
6245 .pending
6246 .remove_address
6247 .insert(removed);
6248 }
6249 Ok(())
6250 }
6251
6252 pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6254 self.iroh_hp.get_local_nat_traversal_addresses()
6255 }
6256
6257 pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6259 Ok(self
6260 .iroh_hp
6261 .client_side()?
6262 .get_remote_nat_traversal_addresses())
6263 }
6264
6265 pub fn initiate_nat_traversal_round(
6273 &mut self,
6274 now: Instant,
6275 ) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
6276 let client_state = self.iroh_hp.client_side_mut()?;
6277 let iroh_hp::NatTraversalRound {
6278 new_round,
6279 reach_out_at,
6280 addresses_to_probe,
6281 prev_round_path_ids,
6282 } = client_state.initiate_nat_traversal_round()?;
6283
6284 self.spaces[SpaceId::Data].pending.reach_out = Some((new_round, reach_out_at));
6285
6286 for path_id in prev_round_path_ids {
6287 let validated = self
6290 .path(path_id)
6291 .map(|path| path.validated)
6292 .unwrap_or(false);
6293
6294 if !validated {
6295 let _ = self.close_path(
6296 now,
6297 path_id,
6298 TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
6299 );
6300 }
6301 }
6302
6303 let mut err = None;
6304
6305 let mut path_ids = Vec::with_capacity(addresses_to_probe.len());
6306 let mut probed_addresses = Vec::with_capacity(addresses_to_probe.len());
6307 let ipv6 = self.paths.values().any(|p| p.data.remote.is_ipv6());
6308
6309 for (ip, port) in addresses_to_probe {
6310 let remote = match ip {
6312 IpAddr::V4(addr) if ipv6 => SocketAddr::new(addr.to_ipv6_mapped().into(), port),
6313 IpAddr::V4(addr) => SocketAddr::new(addr.into(), port),
6314 IpAddr::V6(_) if ipv6 => SocketAddr::new(ip, port),
6315 IpAddr::V6(_) => {
6316 trace!("not using IPv6 nat candidate for IPv4 socket");
6317 continue;
6318 }
6319 };
6320 match self.open_path_ensure(remote, PathStatus::Backup, now) {
6321 Ok((path_id, path_was_known)) if !path_was_known => {
6322 path_ids.push(path_id);
6323 probed_addresses.push(remote);
6324 }
6325 Ok((path_id, _)) => {
6326 trace!(%path_id, %remote,"nat traversal: path existed for remote")
6327 }
6328 Err(e) => {
6329 debug!(%remote, %e,"nat traversal: failed to probe remote");
6330 err.get_or_insert(e);
6331 }
6332 }
6333 }
6334
6335 if let Some(err) = err {
6336 if probed_addresses.is_empty() {
6338 return Err(iroh_hp::Error::Multipath(err));
6339 }
6340 }
6341
6342 self.iroh_hp
6343 .client_side_mut()
6344 .expect("connection side validated")
6345 .set_round_path_ids(path_ids);
6346
6347 Ok(probed_addresses)
6348 }
6349}
6350
6351impl fmt::Debug for Connection {
6352 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6353 f.debug_struct("Connection")
6354 .field("handshake_cid", &self.handshake_cid)
6355 .finish()
6356 }
6357}
6358
6359#[derive(Debug, Copy, Clone, PartialEq, Eq)]
6360enum PathBlocked {
6361 No,
6362 AntiAmplification,
6363 Congestion,
6364 Pacing,
6365}
6366
6367enum ConnectionSide {
6369 Client {
6370 token: Bytes,
6372 token_store: Arc<dyn TokenStore>,
6373 server_name: String,
6374 },
6375 Server {
6376 server_config: Arc<ServerConfig>,
6377 },
6378}
6379
6380impl ConnectionSide {
6381 fn remote_may_migrate(&self, state: &State) -> bool {
6382 match self {
6383 Self::Server { server_config } => server_config.migration,
6384 Self::Client { .. } => {
6385 if let Some(hs) = state.as_handshake() {
6386 hs.allow_server_migration
6387 } else {
6388 false
6389 }
6390 }
6391 }
6392 }
6393
6394 fn is_client(&self) -> bool {
6395 self.side().is_client()
6396 }
6397
6398 fn is_server(&self) -> bool {
6399 self.side().is_server()
6400 }
6401
6402 fn side(&self) -> Side {
6403 match *self {
6404 Self::Client { .. } => Side::Client,
6405 Self::Server { .. } => Side::Server,
6406 }
6407 }
6408}
6409
6410impl From<SideArgs> for ConnectionSide {
6411 fn from(side: SideArgs) -> Self {
6412 match side {
6413 SideArgs::Client {
6414 token_store,
6415 server_name,
6416 } => Self::Client {
6417 token: token_store.take(&server_name).unwrap_or_default(),
6418 token_store,
6419 server_name,
6420 },
6421 SideArgs::Server {
6422 server_config,
6423 pref_addr_cid: _,
6424 path_validated: _,
6425 } => Self::Server { server_config },
6426 }
6427 }
6428}
6429
6430pub(crate) enum SideArgs {
6432 Client {
6433 token_store: Arc<dyn TokenStore>,
6434 server_name: String,
6435 },
6436 Server {
6437 server_config: Arc<ServerConfig>,
6438 pref_addr_cid: Option<ConnectionId>,
6439 path_validated: bool,
6440 },
6441}
6442
6443impl SideArgs {
6444 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
6445 match *self {
6446 Self::Client { .. } => None,
6447 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
6448 }
6449 }
6450
6451 pub(crate) fn path_validated(&self) -> bool {
6452 match *self {
6453 Self::Client { .. } => true,
6454 Self::Server { path_validated, .. } => path_validated,
6455 }
6456 }
6457
6458 pub(crate) fn side(&self) -> Side {
6459 match *self {
6460 Self::Client { .. } => Side::Client,
6461 Self::Server { .. } => Side::Server,
6462 }
6463 }
6464}
6465
6466#[derive(Debug, Error, Clone, PartialEq, Eq)]
6468pub enum ConnectionError {
6469 #[error("peer doesn't implement any supported version")]
6471 VersionMismatch,
6472 #[error(transparent)]
6474 TransportError(#[from] TransportError),
6475 #[error("aborted by peer: {0}")]
6477 ConnectionClosed(frame::ConnectionClose),
6478 #[error("closed by peer: {0}")]
6480 ApplicationClosed(frame::ApplicationClose),
6481 #[error("reset by peer")]
6483 Reset,
6484 #[error("timed out")]
6490 TimedOut,
6491 #[error("closed")]
6493 LocallyClosed,
6494 #[error("CIDs exhausted")]
6498 CidsExhausted,
6499}
6500
6501impl From<Close> for ConnectionError {
6502 fn from(x: Close) -> Self {
6503 match x {
6504 Close::Connection(reason) => Self::ConnectionClosed(reason),
6505 Close::Application(reason) => Self::ApplicationClosed(reason),
6506 }
6507 }
6508}
6509
6510impl From<ConnectionError> for io::Error {
6512 fn from(x: ConnectionError) -> Self {
6513 use ConnectionError::*;
6514 let kind = match x {
6515 TimedOut => io::ErrorKind::TimedOut,
6516 Reset => io::ErrorKind::ConnectionReset,
6517 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
6518 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
6519 io::ErrorKind::Other
6520 }
6521 };
6522 Self::new(kind, x)
6523 }
6524}
6525
6526#[derive(Debug, Error, PartialEq, Eq, Clone, Copy)]
6529pub enum PathError {
6530 #[error("multipath extension not negotiated")]
6532 MultipathNotNegotiated,
6533 #[error("the server side may not open a path")]
6535 ServerSideNotAllowed,
6536 #[error("maximum number of concurrent paths reached")]
6538 MaxPathIdReached,
6539 #[error("remoted CIDs exhausted")]
6541 RemoteCidsExhausted,
6542 #[error("path validation failed")]
6544 ValidationFailed,
6545 #[error("invalid remote address")]
6547 InvalidRemoteAddress(SocketAddr),
6548}
6549
6550#[derive(Debug, Error, Clone, Eq, PartialEq)]
6552pub enum ClosePathError {
6553 #[error("closed path")]
6555 ClosedPath,
6556 #[error("last open path")]
6558 LastOpenPath,
6559}
6560
6561#[derive(Debug, Error, Clone, Copy)]
6562#[error("Multipath extension not negotiated")]
6563pub struct MultipathNotNegotiated {
6564 _private: (),
6565}
6566
6567#[derive(Debug)]
6569pub enum Event {
6570 HandshakeDataReady,
6572 Connected,
6574 HandshakeConfirmed,
6576 ConnectionLost {
6580 reason: ConnectionError,
6582 },
6583 Stream(StreamEvent),
6585 DatagramReceived,
6587 DatagramsUnblocked,
6589 Path(PathEvent),
6591 NatTraversal(iroh_hp::Event),
6593}
6594
6595impl From<PathEvent> for Event {
6596 fn from(source: PathEvent) -> Self {
6597 Self::Path(source)
6598 }
6599}
6600
6601fn get_max_ack_delay(params: &TransportParameters) -> Duration {
6602 Duration::from_micros(params.max_ack_delay.0 * 1000)
6603}
6604
6605const MAX_BACKOFF_EXPONENT: u32 = 16;
6607
6608const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
6616
6617const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
6623 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
6624
6625const KEY_UPDATE_MARGIN: u64 = 10_000;
6629
6630#[derive(Default)]
6631struct SentFrames {
6632 retransmits: ThinRetransmits,
6633 largest_acked: FxHashMap<PathId, u64>,
6635 stream_frames: StreamMetaVec,
6636 non_retransmits: bool,
6638 requires_padding: bool,
6640}
6641
6642impl SentFrames {
6643 fn is_ack_only(&self, streams: &StreamsState) -> bool {
6645 !self.largest_acked.is_empty()
6646 && !self.non_retransmits
6647 && self.stream_frames.is_empty()
6648 && self.retransmits.is_empty(streams)
6649 }
6650}
6651
6652fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
6660 match (x, y) {
6661 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
6662 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
6663 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
6664 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
6665 }
6666}
6667
6668#[cfg(test)]
6669mod tests {
6670 use super::*;
6671
6672 #[test]
6673 fn negotiate_max_idle_timeout_commutative() {
6674 let test_params = [
6675 (None, None, None),
6676 (None, Some(VarInt(0)), None),
6677 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
6678 (Some(VarInt(0)), Some(VarInt(0)), None),
6679 (
6680 Some(VarInt(2)),
6681 Some(VarInt(0)),
6682 Some(Duration::from_millis(2)),
6683 ),
6684 (
6685 Some(VarInt(1)),
6686 Some(VarInt(4)),
6687 Some(Duration::from_millis(1)),
6688 ),
6689 ];
6690
6691 for (left, right, result) in test_params {
6692 assert_eq!(negotiate_max_idle_timeout(left, right), result);
6693 assert_eq!(negotiate_max_idle_timeout(right, left), result);
6694 }
6695 }
6696}