1use std::{
2 collections::{HashMap, hash_map},
3 convert::TryFrom,
4 fmt, mem,
5 net::{IpAddr, SocketAddr},
6 ops::{Index, IndexMut},
7 sync::Arc,
8};
9
10use bytes::{BufMut, Bytes, BytesMut};
11use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
12use rustc_hash::FxHashMap;
13use slab::Slab;
14use thiserror::Error;
15use tracing::{debug, error, trace, warn};
16
17use crate::{
18 Duration, FourTuple, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, PathId,
19 RESET_TOKEN_SIZE, ResetToken, Side, Transmit, TransportConfig, TransportError,
20 cid_generator::ConnectionIdGenerator,
21 coding::{BufMutExt, Encodable},
22 config::{ClientConfig, EndpointConfig, ServerConfig},
23 connection::{Connection, ConnectionError, SideArgs},
24 crypto::{self, Keys, UnsupportedVersion},
25 frame,
26 packet::{
27 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
28 PacketNumber, PartialDecode, ProtectedInitialHeader,
29 },
30 shared::{
31 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
32 EndpointEvent, EndpointEventInner, IssuedCid,
33 },
34 token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
35 transport_parameters::{PreferredAddress, TransportParameters},
36};
37
38pub struct Endpoint {
43 rng: StdRng,
44 index: ConnectionIndex,
45 connections: Slab<ConnectionMeta>,
46 local_cid_generator: Box<dyn ConnectionIdGenerator>,
47 config: Arc<EndpointConfig>,
48 server_config: Option<Arc<ServerConfig>>,
49 allow_mtud: bool,
51 last_stateless_reset: Option<Instant>,
53 incoming_buffers: Slab<IncomingBuffer>,
55 all_incoming_buffers_total_bytes: u64,
56}
57
58impl Endpoint {
59 pub fn new(
70 config: Arc<EndpointConfig>,
71 server_config: Option<Arc<ServerConfig>>,
72 allow_mtud: bool,
73 rng_seed: Option<[u8; 32]>,
74 ) -> Self {
75 let rng_seed = rng_seed.or(config.rng_seed);
76 Self {
77 rng: rng_seed.map_or(StdRng::from_os_rng(), StdRng::from_seed),
78 index: ConnectionIndex::default(),
79 connections: Slab::new(),
80 local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
81 config,
82 server_config,
83 allow_mtud,
84 last_stateless_reset: None,
85 incoming_buffers: Slab::new(),
86 all_incoming_buffers_total_bytes: 0,
87 }
88 }
89
90 pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
92 self.server_config = server_config;
93 }
94
95 pub fn handle_event(
99 &mut self,
100 ch: ConnectionHandle,
101 event: EndpointEvent,
102 ) -> Option<ConnectionEvent> {
103 use EndpointEventInner::*;
104 match event.0 {
105 NeedIdentifiers(path_id, now, n) => {
106 return Some(self.send_new_identifiers(path_id, now, ch, n));
107 }
108 ResetToken(path_id, remote, token) => {
109 if let Some(old) = self.connections[ch]
110 .reset_token
111 .insert(path_id, (remote, token))
112 {
113 self.index.connection_reset_tokens.remove(old.0, old.1);
114 }
115 if self.index.connection_reset_tokens.insert(remote, token, ch) {
116 warn!("duplicate reset token");
117 }
118 }
119 RetireResetToken(path_id) => {
120 if let Some(old) = self.connections[ch].reset_token.remove(&path_id) {
121 self.index.connection_reset_tokens.remove(old.0, old.1);
122 }
123 }
124 RetireConnectionId(now, path_id, seq, allow_more_cids) => {
125 if let Some(cid) = self.connections[ch]
126 .loc_cids
127 .get_mut(&path_id)
128 .and_then(|pcid| pcid.cids.remove(&seq))
129 {
130 trace!(%path_id, "local CID retired {}: {}", seq, cid);
131 self.index.retire(cid);
132 if allow_more_cids {
133 return Some(self.send_new_identifiers(path_id, now, ch, 1));
134 }
135 }
136 }
137 Drained => {
138 if let Some(conn) = self.connections.try_remove(ch.0) {
139 self.index.remove(&conn);
140 } else {
141 error!(id = ch.0, "unknown connection drained");
145 }
146 }
147 }
148 None
149 }
150
151 pub fn handle(
153 &mut self,
154 now: Instant,
155 network_path: FourTuple,
156 ecn: Option<EcnCodepoint>,
157 data: BytesMut,
158 buf: &mut Vec<u8>,
159 ) -> Option<DatagramEvent> {
160 let datagram_len = data.len();
162 let mut event = match PartialDecode::new(
163 data,
164 &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
165 &self.config.supported_versions,
166 self.config.grease_quic_bit,
167 ) {
168 Ok((first_decode, remaining)) => DatagramConnectionEvent {
169 now,
170 network_path,
171 path_id: PathId::ZERO, ecn,
173 first_decode,
174 remaining,
175 },
176 Err(PacketDecodeError::UnsupportedVersion {
177 src_cid,
178 dst_cid,
179 version,
180 }) => {
181 if self.server_config.is_none() {
182 debug!("dropping packet with unsupported version");
183 return None;
184 }
185 trace!("sending version negotiation");
186 Header::VersionNegotiate {
188 random: self.rng.random::<u8>() | 0x40,
189 src_cid: dst_cid,
190 dst_cid: src_cid,
191 }
192 .encode(buf);
193 buf.write::<u32>(match version {
195 0x0a1a_2a3a => 0x0a1a_2a4a,
196 _ => 0x0a1a_2a3a,
197 });
198 for &version in &self.config.supported_versions {
199 buf.write(version);
200 }
201 return Some(DatagramEvent::Response(Transmit {
202 destination: network_path.remote,
203 ecn: None,
204 size: buf.len(),
205 segment_size: None,
206 src_ip: network_path.local_ip,
207 }));
208 }
209 Err(e) => {
210 trace!("malformed header: {}", e);
211 return None;
212 }
213 };
214
215 let dst_cid = event.first_decode.dst_cid();
216
217 if let Some(route_to) = self.index.get(&network_path, &event.first_decode) {
218 event.path_id = match route_to {
219 RouteDatagramTo::Incoming(_) => PathId::ZERO,
220 RouteDatagramTo::Connection(_, path_id) => path_id,
221 };
222 match route_to {
223 RouteDatagramTo::Incoming(incoming_idx) => {
224 let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
225 let config = &self.server_config.as_ref().unwrap();
226
227 if incoming_buffer
228 .total_bytes
229 .checked_add(datagram_len as u64)
230 .is_some_and(|n| n <= config.incoming_buffer_size)
231 && self
232 .all_incoming_buffers_total_bytes
233 .checked_add(datagram_len as u64)
234 .is_some_and(|n| n <= config.incoming_buffer_size_total)
235 {
236 incoming_buffer.datagrams.push(event);
237 incoming_buffer.total_bytes += datagram_len as u64;
238 self.all_incoming_buffers_total_bytes += datagram_len as u64;
239 }
240
241 None
242 }
243 RouteDatagramTo::Connection(ch, _path_id) => Some(DatagramEvent::ConnectionEvent(
244 ch,
245 ConnectionEvent(ConnectionEventInner::Datagram(event)),
246 )),
247 }
248 } else if event.first_decode.initial_header().is_some() {
249 self.handle_first_packet(datagram_len, event, network_path, buf)
252 } else if event.first_decode.has_long_header() {
253 debug!(
254 "ignoring non-initial packet for unknown connection {}",
255 dst_cid
256 );
257 None
258 } else if !event.first_decode.is_initial()
259 && self.local_cid_generator.validate(dst_cid).is_err()
260 {
261 debug!("dropping packet with invalid CID");
262 None
263 } else if dst_cid.is_empty() {
264 trace!("dropping unrecognized short packet without ID");
265 None
266 } else {
267 self.stateless_reset(now, datagram_len, network_path, dst_cid, buf)
270 .map(DatagramEvent::Response)
271 }
272 }
273
274 fn stateless_reset(
276 &mut self,
277 now: Instant,
278 inciting_dgram_len: usize,
279 network_path: FourTuple,
280 dst_cid: ConnectionId,
281 buf: &mut Vec<u8>,
282 ) -> Option<Transmit> {
283 if self
284 .last_stateless_reset
285 .is_some_and(|last| last + self.config.min_reset_interval > now)
286 {
287 debug!("ignoring unexpected packet within minimum stateless reset interval");
288 return None;
289 }
290
291 const MIN_PADDING_LEN: usize = 5;
293
294 let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
297 Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
298 _ => {
299 debug!(
300 "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
301 inciting_dgram_len
302 );
303 return None;
304 }
305 };
306
307 debug!(
308 "sending stateless reset for {} to {}",
309 dst_cid, network_path.remote
310 );
311 self.last_stateless_reset = Some(now);
312 const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
314 let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
315 max_padding_len
316 } else {
317 self.rng
318 .random_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
319 };
320 buf.reserve(padding_len + RESET_TOKEN_SIZE);
321 buf.resize(padding_len, 0);
322 self.rng.fill_bytes(&mut buf[0..padding_len]);
323 buf[0] = 0b0100_0000 | (buf[0] >> 2);
324 buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
325
326 debug_assert!(buf.len() < inciting_dgram_len);
327
328 Some(Transmit {
329 destination: network_path.remote,
330 ecn: None,
331 size: buf.len(),
332 segment_size: None,
333 src_ip: network_path.local_ip,
334 })
335 }
336
337 pub fn connect(
339 &mut self,
340 now: Instant,
341 config: ClientConfig,
342 remote: SocketAddr,
343 server_name: &str,
344 ) -> Result<(ConnectionHandle, Connection), ConnectError> {
345 if self.cids_exhausted() {
346 return Err(ConnectError::CidsExhausted);
347 }
348 if remote.port() == 0 || remote.ip().is_unspecified() {
349 return Err(ConnectError::InvalidRemoteAddress(remote));
350 }
351 if !self.config.supported_versions.contains(&config.version) {
352 return Err(ConnectError::UnsupportedVersion);
353 }
354
355 let remote_id = (config.initial_dst_cid_provider)();
356 trace!(initial_dcid = %remote_id);
357
358 let ch = ConnectionHandle(self.connections.vacant_key());
359 let loc_cid = self.new_cid(ch, PathId::ZERO);
360 let params = TransportParameters::new(
361 &config.transport,
362 &self.config,
363 self.local_cid_generator.as_ref(),
364 loc_cid,
365 None,
366 &mut self.rng,
367 );
368 let tls = config
369 .crypto
370 .start_session(config.version, server_name, ¶ms)?;
371
372 let conn = self.add_connection(
373 ch,
374 config.version,
375 remote_id,
376 loc_cid,
377 remote_id,
378 FourTuple {
379 remote,
380 local_ip: None,
381 },
382 now,
383 tls,
384 config.transport,
385 SideArgs::Client {
386 token_store: config.token_store,
387 server_name: server_name.into(),
388 },
389 ¶ms,
390 );
391 Ok((ch, conn))
392 }
393
394 fn send_new_identifiers(
396 &mut self,
397 path_id: PathId,
398 now: Instant,
399 ch: ConnectionHandle,
400 num: u64,
401 ) -> ConnectionEvent {
402 let mut ids = vec![];
403 for _ in 0..num {
404 let id = self.new_cid(ch, path_id);
405 let cid_meta = self.connections[ch].loc_cids.entry(path_id).or_default();
406 let sequence = cid_meta.issued;
407 cid_meta.issued += 1;
408 cid_meta.cids.insert(sequence, id);
409 ids.push(IssuedCid {
410 path_id,
411 sequence,
412 id,
413 reset_token: ResetToken::new(&*self.config.reset_key, id),
414 });
415 }
416 ConnectionEvent(ConnectionEventInner::NewIdentifiers(
417 ids,
418 now,
419 self.local_cid_generator.cid_len(),
420 self.local_cid_generator.cid_lifetime(),
421 ))
422 }
423
424 fn new_cid(&mut self, ch: ConnectionHandle, path_id: PathId) -> ConnectionId {
426 loop {
427 let cid = self.local_cid_generator.generate_cid();
428 if cid.is_empty() {
429 debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
431 return cid;
432 }
433 if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
434 e.insert((ch, path_id));
435 break cid;
436 }
437 }
438 }
439
440 fn handle_first_packet(
441 &mut self,
442 datagram_len: usize,
443 event: DatagramConnectionEvent,
444 network_path: FourTuple,
445 buf: &mut Vec<u8>,
446 ) -> Option<DatagramEvent> {
447 let dst_cid = event.first_decode.dst_cid();
448 let header = event.first_decode.initial_header().unwrap();
449
450 let Some(server_config) = &self.server_config else {
451 debug!("packet for unrecognized connection {}", dst_cid);
452 return self
453 .stateless_reset(event.now, datagram_len, network_path, dst_cid, buf)
454 .map(DatagramEvent::Response);
455 };
456
457 if datagram_len < MIN_INITIAL_SIZE as usize {
458 debug!("ignoring short initial for connection {}", dst_cid);
459 return None;
460 }
461
462 let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
463 Ok(keys) => keys,
464 Err(UnsupportedVersion) => {
465 debug!(
468 "ignoring initial packet version {:#x} unsupported by cryptographic layer",
469 header.version
470 );
471 return None;
472 }
473 };
474
475 if let Err(reason) = self.early_validate_first_packet(header) {
476 return Some(DatagramEvent::Response(self.initial_close(
477 header.version,
478 network_path,
479 &crypto,
480 header.src_cid,
481 reason,
482 buf,
483 )));
484 }
485
486 let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
487 Ok(packet) => packet,
488 Err(e) => {
489 trace!("unable to decode initial packet: {}", e);
490 return None;
491 }
492 };
493
494 if !packet.reserved_bits_valid() {
495 debug!("dropping connection attempt with invalid reserved bits");
496 return None;
497 }
498
499 let Header::Initial(header) = packet.header else {
500 panic!("non-initial packet in handle_first_packet()");
501 };
502
503 let server_config = self.server_config.as_ref().unwrap().clone();
504
505 let token = match IncomingToken::from_header(&header, &server_config, network_path.remote) {
506 Ok(token) => token,
507 Err(InvalidRetryTokenError) => {
508 debug!("rejecting invalid retry token");
509 return Some(DatagramEvent::Response(self.initial_close(
510 header.version,
511 network_path,
512 &crypto,
513 header.src_cid,
514 TransportError::INVALID_TOKEN(""),
515 buf,
516 )));
517 }
518 };
519
520 let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
521 self.index
522 .insert_initial_incoming(header.dst_cid, incoming_idx);
523
524 Some(DatagramEvent::NewConnection(Incoming {
525 received_at: event.now,
526 network_path,
527 ecn: event.ecn,
528 packet: InitialPacket {
529 header,
530 header_data: packet.header_data,
531 payload: packet.payload,
532 },
533 rest: event.remaining,
534 crypto,
535 token,
536 incoming_idx,
537 improper_drop_warner: IncomingImproperDropWarner,
538 }))
539 }
540
541 pub fn accept(
544 &mut self,
545 mut incoming: Incoming,
546 now: Instant,
547 buf: &mut Vec<u8>,
548 server_config: Option<Arc<ServerConfig>>,
549 ) -> Result<(ConnectionHandle, Connection), Box<AcceptError>> {
550 let remote_address_validated = incoming.remote_address_validated();
551 incoming.improper_drop_warner.dismiss();
552 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
553 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
554
555 let packet_number = incoming.packet.header.number.expand(0);
556 let InitialHeader {
557 src_cid,
558 dst_cid,
559 version,
560 ..
561 } = incoming.packet.header;
562 let server_config =
563 server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
564
565 if server_config
566 .transport
567 .max_idle_timeout
568 .is_some_and(|timeout| {
569 incoming.received_at + Duration::from_millis(timeout.into()) <= now
570 })
571 {
572 debug!("abandoning accept of stale initial");
573 self.index.remove_initial(dst_cid);
574 return Err(Box::new(AcceptError {
575 cause: ConnectionError::TimedOut,
576 response: None,
577 }));
578 }
579
580 if self.cids_exhausted() {
581 debug!("refusing connection");
582 self.index.remove_initial(dst_cid);
583 return Err(Box::new(AcceptError {
584 cause: ConnectionError::CidsExhausted,
585 response: Some(self.initial_close(
586 version,
587 incoming.network_path,
588 &incoming.crypto,
589 src_cid,
590 TransportError::CONNECTION_REFUSED(""),
591 buf,
592 )),
593 }));
594 }
595
596 if incoming
597 .crypto
598 .packet
599 .remote
600 .decrypt(
601 PathId::ZERO,
602 packet_number,
603 &incoming.packet.header_data,
604 &mut incoming.packet.payload,
605 )
606 .is_err()
607 {
608 debug!(packet_number, "failed to authenticate initial packet");
609 self.index.remove_initial(dst_cid);
610 return Err(Box::new(AcceptError {
611 cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
612 response: None,
613 }));
614 };
615
616 let ch = ConnectionHandle(self.connections.vacant_key());
617 let loc_cid = self.new_cid(ch, PathId::ZERO);
618 let mut params = TransportParameters::new(
619 &server_config.transport,
620 &self.config,
621 self.local_cid_generator.as_ref(),
622 loc_cid,
623 Some(&server_config),
624 &mut self.rng,
625 );
626 params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
627 params.original_dst_cid = Some(incoming.token.orig_dst_cid);
628 params.retry_src_cid = incoming.token.retry_src_cid;
629 let mut pref_addr_cid = None;
630 if server_config.has_preferred_address() {
631 let cid = self.new_cid(ch, PathId::ZERO);
632 pref_addr_cid = Some(cid);
633 params.preferred_address = Some(PreferredAddress {
634 address_v4: server_config.preferred_address_v4,
635 address_v6: server_config.preferred_address_v6,
636 connection_id: cid,
637 stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
638 });
639 }
640
641 let tls = server_config.crypto.clone().start_session(version, ¶ms);
642 let transport_config = server_config.transport.clone();
643 let mut conn = self.add_connection(
644 ch,
645 version,
646 dst_cid,
647 loc_cid,
648 src_cid,
649 incoming.network_path,
650 incoming.received_at,
651 tls,
652 transport_config,
653 SideArgs::Server {
654 server_config,
655 pref_addr_cid,
656 path_validated: remote_address_validated,
657 },
658 ¶ms,
659 );
660 self.index.insert_initial(dst_cid, ch);
661
662 match conn.handle_first_packet(
663 incoming.received_at,
664 incoming.network_path,
665 incoming.ecn,
666 packet_number,
667 incoming.packet,
668 incoming.rest,
669 ) {
670 Ok(()) => {
671 trace!(id = ch.0, icid = %dst_cid, "new connection");
672
673 for event in incoming_buffer.datagrams {
674 conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
675 }
676
677 Ok((ch, conn))
678 }
679 Err(e) => {
680 debug!("handshake failed: {}", e);
681 self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
682 let response = match e {
683 ConnectionError::TransportError(ref e) => Some(self.initial_close(
684 version,
685 incoming.network_path,
686 &incoming.crypto,
687 src_cid,
688 e.clone(),
689 buf,
690 )),
691 _ => None,
692 };
693 Err(Box::new(AcceptError { cause: e, response }))
694 }
695 }
696 }
697
698 fn early_validate_first_packet(
700 &mut self,
701 header: &ProtectedInitialHeader,
702 ) -> Result<(), TransportError> {
703 let config = &self.server_config.as_ref().unwrap();
704 if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
705 return Err(TransportError::CONNECTION_REFUSED(""));
706 }
707
708 if header.dst_cid.len() < 8
713 && (header.token_pos.is_empty()
714 || header.dst_cid.len() != self.local_cid_generator.cid_len())
715 {
716 debug!(
717 "rejecting connection due to invalid DCID length {}",
718 header.dst_cid.len()
719 );
720 return Err(TransportError::PROTOCOL_VIOLATION(
721 "invalid destination CID length",
722 ));
723 }
724
725 Ok(())
726 }
727
728 pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
730 self.clean_up_incoming(&incoming);
731 incoming.improper_drop_warner.dismiss();
732
733 self.initial_close(
734 incoming.packet.header.version,
735 incoming.network_path,
736 &incoming.crypto,
737 incoming.packet.header.src_cid,
738 TransportError::CONNECTION_REFUSED(""),
739 buf,
740 )
741 }
742
743 pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
747 if !incoming.may_retry() {
748 return Err(RetryError(Box::new(incoming)));
749 }
750
751 self.clean_up_incoming(&incoming);
752 incoming.improper_drop_warner.dismiss();
753
754 let server_config = self.server_config.as_ref().unwrap();
755
756 let loc_cid = self.local_cid_generator.generate_cid();
763
764 let payload = TokenPayload::Retry {
765 address: incoming.network_path.remote,
766 orig_dst_cid: incoming.packet.header.dst_cid,
767 issued: server_config.time_source.now(),
768 };
769 let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
770
771 let header = Header::Retry {
772 src_cid: loc_cid,
773 dst_cid: incoming.packet.header.src_cid,
774 version: incoming.packet.header.version,
775 };
776
777 let encode = header.encode(buf);
778 buf.put_slice(&token);
779 buf.extend_from_slice(&server_config.crypto.retry_tag(
780 incoming.packet.header.version,
781 incoming.packet.header.dst_cid,
782 buf,
783 ));
784 encode.finish(buf, &*incoming.crypto.header.local, None);
785
786 Ok(Transmit {
787 destination: incoming.network_path.remote,
788 ecn: None,
789 size: buf.len(),
790 segment_size: None,
791 src_ip: incoming.network_path.local_ip,
792 })
793 }
794
795 pub fn ignore(&mut self, incoming: Incoming) {
800 self.clean_up_incoming(&incoming);
801 incoming.improper_drop_warner.dismiss();
802 }
803
804 fn clean_up_incoming(&mut self, incoming: &Incoming) {
806 self.index.remove_initial(incoming.packet.header.dst_cid);
807 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
808 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
809 }
810
811 fn add_connection(
812 &mut self,
813 ch: ConnectionHandle,
814 version: u32,
815 init_cid: ConnectionId,
816 loc_cid: ConnectionId,
817 rem_cid: ConnectionId,
818 network_path: FourTuple,
819 now: Instant,
820 tls: Box<dyn crypto::Session>,
821 transport_config: Arc<TransportConfig>,
822 side_args: SideArgs,
823 params: &TransportParameters,
825 ) -> Connection {
826 let mut rng_seed = [0; 32];
827 self.rng.fill_bytes(&mut rng_seed);
828 let side = side_args.side();
829 let pref_addr_cid = side_args.pref_addr_cid();
830
831 let qlog =
832 transport_config.create_qlog_sink(side_args.side(), network_path.remote, init_cid, now);
833
834 qlog.emit_connection_started(
835 now,
836 loc_cid,
837 rem_cid,
838 network_path.remote,
839 network_path.local_ip,
840 params,
841 );
842
843 let conn = Connection::new(
844 self.config.clone(),
845 transport_config,
846 init_cid,
847 loc_cid,
848 rem_cid,
849 network_path,
850 tls,
851 self.local_cid_generator.as_ref(),
852 now,
853 version,
854 self.allow_mtud,
855 rng_seed,
856 side_args,
857 qlog,
858 );
859
860 let mut path_cids = PathLocalCids::default();
861 path_cids.cids.insert(path_cids.issued, loc_cid);
862 path_cids.issued += 1;
863
864 if let Some(cid) = pref_addr_cid {
865 debug_assert_eq!(path_cids.issued, 1, "preferred address cid seq must be 1");
866 path_cids.cids.insert(path_cids.issued, cid);
867 path_cids.issued += 1;
868 }
869
870 let id = self.connections.insert(ConnectionMeta {
871 init_cid,
872 loc_cids: FxHashMap::from_iter([(PathId::ZERO, path_cids)]),
873 network_path,
874 side,
875 reset_token: Default::default(),
876 });
877 debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
878
879 self.index.insert_conn(network_path, loc_cid, ch, side);
880
881 conn
882 }
883
884 fn initial_close(
885 &mut self,
886 version: u32,
887 network_path: FourTuple,
888 crypto: &Keys,
889 remote_id: ConnectionId,
890 reason: TransportError,
891 buf: &mut Vec<u8>,
892 ) -> Transmit {
893 let local_id = self.local_cid_generator.generate_cid();
897 let number = PacketNumber::U8(0);
898 let header = Header::Initial(InitialHeader {
899 dst_cid: remote_id,
900 src_cid: local_id,
901 number,
902 token: Bytes::new(),
903 version,
904 });
905
906 let partial_encode = header.encode(buf);
907 let max_len =
908 INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
909 frame::Close::from(reason).encoder(max_len).encode(buf);
910 buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
911 partial_encode.finish(
912 buf,
913 &*crypto.header.local,
914 Some((0, Default::default(), &*crypto.packet.local)),
915 );
916 Transmit {
917 destination: network_path.remote,
918 ecn: None,
919 size: buf.len(),
920 segment_size: None,
921 src_ip: network_path.local_ip,
922 }
923 }
924
925 pub fn config(&self) -> &EndpointConfig {
927 &self.config
928 }
929
930 pub fn open_connections(&self) -> usize {
932 self.connections.len()
933 }
934
935 pub fn incoming_buffer_bytes(&self) -> u64 {
938 self.all_incoming_buffers_total_bytes
939 }
940
941 #[cfg(test)]
942 pub(crate) fn known_connections(&self) -> usize {
943 let x = self.connections.len();
944 debug_assert_eq!(x, self.index.connection_ids_initial.len());
945 debug_assert!(x >= self.index.connection_reset_tokens.0.len());
947 debug_assert!(x >= self.index.incoming_connection_remotes.len());
949 debug_assert!(x >= self.index.outgoing_connection_remotes.len());
950 x
951 }
952
953 #[cfg(test)]
954 pub(crate) fn known_cids(&self) -> usize {
955 self.index.connection_ids.len()
956 }
957
958 fn cids_exhausted(&self) -> bool {
963 self.local_cid_generator.cid_len() <= 4
964 && self.local_cid_generator.cid_len() != 0
965 && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
966 - self.index.connection_ids.len())
967 < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
968 }
969}
970
971impl fmt::Debug for Endpoint {
972 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
973 fmt.debug_struct("Endpoint")
974 .field("rng", &self.rng)
975 .field("index", &self.index)
976 .field("connections", &self.connections)
977 .field("config", &self.config)
978 .field("server_config", &self.server_config)
979 .field("incoming_buffers.len", &self.incoming_buffers.len())
981 .field(
982 "all_incoming_buffers_total_bytes",
983 &self.all_incoming_buffers_total_bytes,
984 )
985 .finish()
986 }
987}
988
989#[derive(Default)]
991struct IncomingBuffer {
992 datagrams: Vec<DatagramConnectionEvent>,
993 total_bytes: u64,
994}
995
996#[derive(Copy, Clone, Debug)]
998enum RouteDatagramTo {
999 Incoming(usize),
1000 Connection(ConnectionHandle, PathId),
1001}
1002
1003#[derive(Default, Debug)]
1005struct ConnectionIndex {
1006 connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1012 connection_ids: FxHashMap<ConnectionId, (ConnectionHandle, PathId)>,
1016 incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1020 outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1030 connection_reset_tokens: ResetTokenTable,
1035}
1036
1037impl ConnectionIndex {
1038 fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1040 if dst_cid.is_empty() {
1041 return;
1042 }
1043 self.connection_ids_initial
1044 .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1045 }
1046
1047 fn remove_initial(&mut self, dst_cid: ConnectionId) {
1049 if dst_cid.is_empty() {
1050 return;
1051 }
1052 let removed = self.connection_ids_initial.remove(&dst_cid);
1053 debug_assert!(removed.is_some());
1054 }
1055
1056 fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1058 if dst_cid.is_empty() {
1059 return;
1060 }
1061 self.connection_ids_initial.insert(
1062 dst_cid,
1063 RouteDatagramTo::Connection(connection, PathId::ZERO),
1064 );
1065 }
1066
1067 fn insert_conn(
1070 &mut self,
1071 network_path: FourTuple,
1072 dst_cid: ConnectionId,
1073 connection: ConnectionHandle,
1074 side: Side,
1075 ) {
1076 match dst_cid.len() {
1077 0 => match side {
1078 Side::Server => {
1079 self.incoming_connection_remotes
1080 .insert(network_path, connection);
1081 }
1082 Side::Client => {
1083 self.outgoing_connection_remotes
1084 .insert(network_path.remote, connection);
1085 }
1086 },
1087 _ => {
1088 self.connection_ids
1089 .insert(dst_cid, (connection, PathId::ZERO));
1090 }
1091 }
1092 }
1093
1094 fn retire(&mut self, dst_cid: ConnectionId) {
1096 self.connection_ids.remove(&dst_cid);
1097 }
1098
1099 fn remove(&mut self, conn: &ConnectionMeta) {
1101 if conn.side.is_server() {
1102 self.remove_initial(conn.init_cid);
1103 }
1104 for cid in conn.loc_cids.values().flat_map(|pcids| pcids.cids.values()) {
1105 self.connection_ids.remove(cid);
1106 }
1107 self.incoming_connection_remotes.remove(&conn.network_path);
1108 self.outgoing_connection_remotes
1109 .remove(&conn.network_path.remote);
1110 for (remote, token) in conn.reset_token.values() {
1111 self.connection_reset_tokens.remove(*remote, *token);
1112 }
1113 }
1114
1115 fn get(&self, network_path: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1117 if !datagram.dst_cid().is_empty() {
1118 if let Some(&(ch, path_id)) = self.connection_ids.get(&datagram.dst_cid()) {
1119 return Some(RouteDatagramTo::Connection(ch, path_id));
1120 }
1121 }
1122 if datagram.is_initial() || datagram.is_0rtt() {
1123 if let Some(&ch) = self.connection_ids_initial.get(&datagram.dst_cid()) {
1124 return Some(ch);
1125 }
1126 }
1127 if datagram.dst_cid().is_empty() {
1128 if let Some(&ch) = self.incoming_connection_remotes.get(network_path) {
1129 return Some(RouteDatagramTo::Connection(ch, PathId::ZERO));
1132 }
1133 if let Some(&ch) = self.outgoing_connection_remotes.get(&network_path.remote) {
1134 return Some(RouteDatagramTo::Connection(ch, PathId::ZERO));
1136 }
1137 }
1138 let data = datagram.data();
1139 if data.len() < RESET_TOKEN_SIZE {
1140 return None;
1141 }
1142 self.connection_reset_tokens
1145 .get(network_path.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1146 .cloned()
1147 .map(|ch| RouteDatagramTo::Connection(ch, PathId::ZERO))
1148 }
1149}
1150
1151#[derive(Debug)]
1152pub(crate) struct ConnectionMeta {
1153 init_cid: ConnectionId,
1154 loc_cids: FxHashMap<PathId, PathLocalCids>,
1156 network_path: FourTuple,
1161 side: Side,
1162 reset_token: FxHashMap<PathId, (SocketAddr, ResetToken)>,
1171}
1172
1173#[derive(Debug, Default)]
1175struct PathLocalCids {
1176 issued: u64,
1180 cids: FxHashMap<u64, ConnectionId>,
1182}
1183
1184#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1186pub struct ConnectionHandle(pub usize);
1187
1188impl From<ConnectionHandle> for usize {
1189 fn from(x: ConnectionHandle) -> Self {
1190 x.0
1191 }
1192}
1193
1194impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1195 type Output = ConnectionMeta;
1196 fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1197 &self[ch.0]
1198 }
1199}
1200
1201impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1202 fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1203 &mut self[ch.0]
1204 }
1205}
1206
1207pub enum DatagramEvent {
1209 ConnectionEvent(ConnectionHandle, ConnectionEvent),
1211 NewConnection(Incoming),
1213 Response(Transmit),
1215}
1216
1217pub struct Incoming {
1219 received_at: Instant,
1220 network_path: FourTuple,
1221 ecn: Option<EcnCodepoint>,
1222 packet: InitialPacket,
1223 rest: Option<BytesMut>,
1224 crypto: Keys,
1225 token: IncomingToken,
1226 incoming_idx: usize,
1227 improper_drop_warner: IncomingImproperDropWarner,
1228}
1229
1230impl Incoming {
1231 pub fn local_ip(&self) -> Option<IpAddr> {
1235 self.network_path.local_ip
1236 }
1237
1238 pub fn remote_address(&self) -> SocketAddr {
1240 self.network_path.remote
1241 }
1242
1243 pub fn remote_address_validated(&self) -> bool {
1251 self.token.validated
1252 }
1253
1254 pub fn may_retry(&self) -> bool {
1259 self.token.retry_src_cid.is_none()
1260 }
1261
1262 pub fn orig_dst_cid(&self) -> ConnectionId {
1264 self.token.orig_dst_cid
1265 }
1266}
1267
1268impl fmt::Debug for Incoming {
1269 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1270 f.debug_struct("Incoming")
1271 .field("network_path", &self.network_path)
1272 .field("ecn", &self.ecn)
1273 .field("token", &self.token)
1276 .field("incoming_idx", &self.incoming_idx)
1277 .finish_non_exhaustive()
1279 }
1280}
1281
1282struct IncomingImproperDropWarner;
1283
1284impl IncomingImproperDropWarner {
1285 fn dismiss(self) {
1286 mem::forget(self);
1287 }
1288}
1289
1290impl Drop for IncomingImproperDropWarner {
1291 fn drop(&mut self) {
1292 warn!(
1293 "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1294 (may cause memory leak and eventual inability to accept new connections)"
1295 );
1296 }
1297}
1298
1299#[derive(Debug, Error, Clone, PartialEq, Eq)]
1303pub enum ConnectError {
1304 #[error("endpoint stopping")]
1308 EndpointStopping,
1309 #[error("CIDs exhausted")]
1313 CidsExhausted,
1314 #[error("invalid server name: {0}")]
1316 InvalidServerName(String),
1317 #[error("invalid remote address: {0}")]
1321 InvalidRemoteAddress(SocketAddr),
1322 #[error("no default client config")]
1326 NoDefaultClientConfig,
1327 #[error("unsupported QUIC version")]
1329 UnsupportedVersion,
1330}
1331
1332#[derive(Debug)]
1334pub struct AcceptError {
1335 pub cause: ConnectionError,
1337 pub response: Option<Transmit>,
1339}
1340
1341#[derive(Debug, Error)]
1343#[error("retry() with validated Incoming")]
1344pub struct RetryError(Box<Incoming>);
1345
1346impl RetryError {
1347 pub fn into_incoming(self) -> Incoming {
1349 *self.0
1350 }
1351}
1352
1353#[derive(Default, Debug)]
1358struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1359
1360impl ResetTokenTable {
1361 fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1362 self.0
1363 .entry(remote)
1364 .or_default()
1365 .insert(token, ch)
1366 .is_some()
1367 }
1368
1369 fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1370 use std::collections::hash_map::Entry;
1371 match self.0.entry(remote) {
1372 Entry::Vacant(_) => {}
1373 Entry::Occupied(mut e) => {
1374 e.get_mut().remove(&token);
1375 if e.get().is_empty() {
1376 e.remove_entry();
1377 }
1378 }
1379 }
1380 }
1381
1382 fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1383 let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1384 self.0.get(&remote)?.get(&token)
1385 }
1386}