1use std::{
2 collections::{HashMap, hash_map},
3 convert::TryFrom,
4 fmt, mem,
5 net::{IpAddr, SocketAddr},
6 ops::{Index, IndexMut},
7 sync::Arc,
8};
9
10use bytes::{BufMut, Bytes, BytesMut};
11use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
12use rustc_hash::FxHashMap;
13use slab::Slab;
14use thiserror::Error;
15use tracing::{debug, error, trace, warn};
16
17use crate::{
18 Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, PathId, RESET_TOKEN_SIZE,
19 ResetToken, Side, Transmit, TransportConfig, TransportError,
20 cid_generator::ConnectionIdGenerator,
21 coding::BufMutExt,
22 config::{ClientConfig, EndpointConfig, ServerConfig},
23 connection::{Connection, ConnectionError, SideArgs},
24 crypto::{self, Keys, UnsupportedVersion},
25 frame,
26 packet::{
27 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
28 PacketNumber, PartialDecode, ProtectedInitialHeader,
29 },
30 shared::{
31 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
32 EndpointEvent, EndpointEventInner, IssuedCid,
33 },
34 token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
35 transport_parameters::{PreferredAddress, TransportParameters},
36};
37
38pub struct Endpoint {
43 rng: StdRng,
44 index: ConnectionIndex,
45 connections: Slab<ConnectionMeta>,
46 local_cid_generator: Box<dyn ConnectionIdGenerator>,
47 config: Arc<EndpointConfig>,
48 server_config: Option<Arc<ServerConfig>>,
49 allow_mtud: bool,
51 last_stateless_reset: Option<Instant>,
53 incoming_buffers: Slab<IncomingBuffer>,
55 all_incoming_buffers_total_bytes: u64,
56}
57
58impl Endpoint {
59 pub fn new(
70 config: Arc<EndpointConfig>,
71 server_config: Option<Arc<ServerConfig>>,
72 allow_mtud: bool,
73 rng_seed: Option<[u8; 32]>,
74 ) -> Self {
75 let rng_seed = rng_seed.or(config.rng_seed);
76 Self {
77 rng: rng_seed.map_or(StdRng::from_os_rng(), StdRng::from_seed),
78 index: ConnectionIndex::default(),
79 connections: Slab::new(),
80 local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
81 config,
82 server_config,
83 allow_mtud,
84 last_stateless_reset: None,
85 incoming_buffers: Slab::new(),
86 all_incoming_buffers_total_bytes: 0,
87 }
88 }
89
90 pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
92 self.server_config = server_config;
93 }
94
95 pub fn handle_event(
99 &mut self,
100 ch: ConnectionHandle,
101 event: EndpointEvent,
102 ) -> Option<ConnectionEvent> {
103 use EndpointEventInner::*;
104 match event.0 {
105 NeedIdentifiers(path_id, now, n) => {
106 return Some(self.send_new_identifiers(path_id, now, ch, n));
107 }
108 ResetToken(path_id, remote, token) => {
109 if let Some(old) = self.connections[ch]
110 .reset_token
111 .insert(path_id, (remote, token))
112 {
113 self.index.connection_reset_tokens.remove(old.0, old.1);
114 }
115 if self.index.connection_reset_tokens.insert(remote, token, ch) {
116 warn!("duplicate reset token");
117 }
118 }
119 RetireResetToken(path_id) => {
120 if let Some(old) = self.connections[ch].reset_token.remove(&path_id) {
121 self.index.connection_reset_tokens.remove(old.0, old.1);
122 }
123 }
124 RetireConnectionId(now, path_id, seq, allow_more_cids) => {
125 if let Some(cid) = self.connections[ch]
126 .loc_cids
127 .get_mut(&path_id)
128 .and_then(|pcid| pcid.cids.remove(&seq))
129 {
130 trace!(%path_id, "local CID retired {}: {}", seq, cid);
131 self.index.retire(cid);
132 if allow_more_cids {
133 return Some(self.send_new_identifiers(path_id, now, ch, 1));
134 }
135 }
136 }
137 Drained => {
138 if let Some(conn) = self.connections.try_remove(ch.0) {
139 self.index.remove(&conn);
140 } else {
141 error!(id = ch.0, "unknown connection drained");
145 }
146 }
147 }
148 None
149 }
150
151 pub fn handle(
153 &mut self,
154 now: Instant,
155 remote: SocketAddr,
156 local_ip: Option<IpAddr>,
157 ecn: Option<EcnCodepoint>,
158 data: BytesMut,
159 buf: &mut Vec<u8>,
160 ) -> Option<DatagramEvent> {
161 let datagram_len = data.len();
163 let mut event = match PartialDecode::new(
164 data,
165 &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
166 &self.config.supported_versions,
167 self.config.grease_quic_bit,
168 ) {
169 Ok((first_decode, remaining)) => DatagramConnectionEvent {
170 now,
171 remote,
172 path_id: PathId::ZERO, ecn,
174 first_decode,
175 remaining,
176 },
177 Err(PacketDecodeError::UnsupportedVersion {
178 src_cid,
179 dst_cid,
180 version,
181 }) => {
182 if self.server_config.is_none() {
183 debug!("dropping packet with unsupported version");
184 return None;
185 }
186 trace!("sending version negotiation");
187 Header::VersionNegotiate {
189 random: self.rng.random::<u8>() | 0x40,
190 src_cid: dst_cid,
191 dst_cid: src_cid,
192 }
193 .encode(buf);
194 buf.write::<u32>(match version {
196 0x0a1a_2a3a => 0x0a1a_2a4a,
197 _ => 0x0a1a_2a3a,
198 });
199 for &version in &self.config.supported_versions {
200 buf.write(version);
201 }
202 return Some(DatagramEvent::Response(Transmit {
203 destination: remote,
204 ecn: None,
205 size: buf.len(),
206 segment_size: None,
207 src_ip: local_ip,
208 }));
209 }
210 Err(e) => {
211 trace!("malformed header: {}", e);
212 return None;
213 }
214 };
215
216 let addresses = FourTuple { remote, local_ip };
217 let dst_cid = event.first_decode.dst_cid();
218
219 if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
220 event.path_id = match route_to {
221 RouteDatagramTo::Incoming(_) => PathId::ZERO,
222 RouteDatagramTo::Connection(_, path_id) => path_id,
223 };
224 match route_to {
225 RouteDatagramTo::Incoming(incoming_idx) => {
226 let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
227 let config = &self.server_config.as_ref().unwrap();
228
229 if incoming_buffer
230 .total_bytes
231 .checked_add(datagram_len as u64)
232 .is_some_and(|n| n <= config.incoming_buffer_size)
233 && self
234 .all_incoming_buffers_total_bytes
235 .checked_add(datagram_len as u64)
236 .is_some_and(|n| n <= config.incoming_buffer_size_total)
237 {
238 incoming_buffer.datagrams.push(event);
239 incoming_buffer.total_bytes += datagram_len as u64;
240 self.all_incoming_buffers_total_bytes += datagram_len as u64;
241 }
242
243 None
244 }
245 RouteDatagramTo::Connection(ch, _path_id) => Some(DatagramEvent::ConnectionEvent(
246 ch,
247 ConnectionEvent(ConnectionEventInner::Datagram(event)),
248 )),
249 }
250 } else if event.first_decode.initial_header().is_some() {
251 self.handle_first_packet(datagram_len, event, addresses, buf)
254 } else if event.first_decode.has_long_header() {
255 debug!(
256 "ignoring non-initial packet for unknown connection {}",
257 dst_cid
258 );
259 None
260 } else if !event.first_decode.is_initial()
261 && self.local_cid_generator.validate(dst_cid).is_err()
262 {
263 debug!("dropping packet with invalid CID");
264 None
265 } else if dst_cid.is_empty() {
266 trace!("dropping unrecognized short packet without ID");
267 None
268 } else {
269 self.stateless_reset(now, datagram_len, addresses, dst_cid, buf)
272 .map(DatagramEvent::Response)
273 }
274 }
275
276 fn stateless_reset(
278 &mut self,
279 now: Instant,
280 inciting_dgram_len: usize,
281 addresses: FourTuple,
282 dst_cid: ConnectionId,
283 buf: &mut Vec<u8>,
284 ) -> Option<Transmit> {
285 if self
286 .last_stateless_reset
287 .is_some_and(|last| last + self.config.min_reset_interval > now)
288 {
289 debug!("ignoring unexpected packet within minimum stateless reset interval");
290 return None;
291 }
292
293 const MIN_PADDING_LEN: usize = 5;
295
296 let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
299 Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
300 _ => {
301 debug!(
302 "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
303 inciting_dgram_len
304 );
305 return None;
306 }
307 };
308
309 debug!(
310 "sending stateless reset for {} to {}",
311 dst_cid, addresses.remote
312 );
313 self.last_stateless_reset = Some(now);
314 const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
316 let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
317 max_padding_len
318 } else {
319 self.rng
320 .random_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
321 };
322 buf.reserve(padding_len + RESET_TOKEN_SIZE);
323 buf.resize(padding_len, 0);
324 self.rng.fill_bytes(&mut buf[0..padding_len]);
325 buf[0] = 0b0100_0000 | (buf[0] >> 2);
326 buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
327
328 debug_assert!(buf.len() < inciting_dgram_len);
329
330 Some(Transmit {
331 destination: addresses.remote,
332 ecn: None,
333 size: buf.len(),
334 segment_size: None,
335 src_ip: addresses.local_ip,
336 })
337 }
338
339 pub fn connect(
341 &mut self,
342 now: Instant,
343 config: ClientConfig,
344 remote: SocketAddr,
345 server_name: &str,
346 ) -> Result<(ConnectionHandle, Connection), ConnectError> {
347 if self.cids_exhausted() {
348 return Err(ConnectError::CidsExhausted);
349 }
350 if remote.port() == 0 || remote.ip().is_unspecified() {
351 return Err(ConnectError::InvalidRemoteAddress(remote));
352 }
353 if !self.config.supported_versions.contains(&config.version) {
354 return Err(ConnectError::UnsupportedVersion);
355 }
356
357 let remote_id = (config.initial_dst_cid_provider)();
358 trace!(initial_dcid = %remote_id);
359
360 let ch = ConnectionHandle(self.connections.vacant_key());
361 let loc_cid = self.new_cid(ch, PathId::ZERO);
362 let params = TransportParameters::new(
363 &config.transport,
364 &self.config,
365 self.local_cid_generator.as_ref(),
366 loc_cid,
367 None,
368 &mut self.rng,
369 );
370 let tls = config
371 .crypto
372 .start_session(config.version, server_name, ¶ms)?;
373
374 let conn = self.add_connection(
375 ch,
376 config.version,
377 remote_id,
378 loc_cid,
379 remote_id,
380 FourTuple {
381 remote,
382 local_ip: None,
383 },
384 now,
385 tls,
386 config.transport,
387 SideArgs::Client {
388 token_store: config.token_store,
389 server_name: server_name.into(),
390 },
391 ¶ms,
392 );
393 Ok((ch, conn))
394 }
395
396 fn send_new_identifiers(
398 &mut self,
399 path_id: PathId,
400 now: Instant,
401 ch: ConnectionHandle,
402 num: u64,
403 ) -> ConnectionEvent {
404 let mut ids = vec![];
405 for _ in 0..num {
406 let id = self.new_cid(ch, path_id);
407 let cid_meta = self.connections[ch].loc_cids.entry(path_id).or_default();
408 let sequence = cid_meta.issued;
409 cid_meta.issued += 1;
410 cid_meta.cids.insert(sequence, id);
411 ids.push(IssuedCid {
412 path_id,
413 sequence,
414 id,
415 reset_token: ResetToken::new(&*self.config.reset_key, id),
416 });
417 }
418 ConnectionEvent(ConnectionEventInner::NewIdentifiers(
419 ids,
420 now,
421 self.local_cid_generator.cid_len(),
422 self.local_cid_generator.cid_lifetime(),
423 ))
424 }
425
426 fn new_cid(&mut self, ch: ConnectionHandle, path_id: PathId) -> ConnectionId {
428 loop {
429 let cid = self.local_cid_generator.generate_cid();
430 if cid.is_empty() {
431 debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
433 return cid;
434 }
435 if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
436 e.insert((ch, path_id));
437 break cid;
438 }
439 }
440 }
441
442 fn handle_first_packet(
443 &mut self,
444 datagram_len: usize,
445 event: DatagramConnectionEvent,
446 addresses: FourTuple,
447 buf: &mut Vec<u8>,
448 ) -> Option<DatagramEvent> {
449 let dst_cid = event.first_decode.dst_cid();
450 let header = event.first_decode.initial_header().unwrap();
451
452 let Some(server_config) = &self.server_config else {
453 debug!("packet for unrecognized connection {}", dst_cid);
454 return self
455 .stateless_reset(event.now, datagram_len, addresses, dst_cid, buf)
456 .map(DatagramEvent::Response);
457 };
458
459 if datagram_len < MIN_INITIAL_SIZE as usize {
460 debug!("ignoring short initial for connection {}", dst_cid);
461 return None;
462 }
463
464 let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
465 Ok(keys) => keys,
466 Err(UnsupportedVersion) => {
467 debug!(
470 "ignoring initial packet version {:#x} unsupported by cryptographic layer",
471 header.version
472 );
473 return None;
474 }
475 };
476
477 if let Err(reason) = self.early_validate_first_packet(header) {
478 return Some(DatagramEvent::Response(self.initial_close(
479 header.version,
480 addresses,
481 &crypto,
482 header.src_cid,
483 reason,
484 buf,
485 )));
486 }
487
488 let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
489 Ok(packet) => packet,
490 Err(e) => {
491 trace!("unable to decode initial packet: {}", e);
492 return None;
493 }
494 };
495
496 if !packet.reserved_bits_valid() {
497 debug!("dropping connection attempt with invalid reserved bits");
498 return None;
499 }
500
501 let Header::Initial(header) = packet.header else {
502 panic!("non-initial packet in handle_first_packet()");
503 };
504
505 let server_config = self.server_config.as_ref().unwrap().clone();
506
507 let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
508 Ok(token) => token,
509 Err(InvalidRetryTokenError) => {
510 debug!("rejecting invalid retry token");
511 return Some(DatagramEvent::Response(self.initial_close(
512 header.version,
513 addresses,
514 &crypto,
515 header.src_cid,
516 TransportError::INVALID_TOKEN(""),
517 buf,
518 )));
519 }
520 };
521
522 let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
523 self.index
524 .insert_initial_incoming(header.dst_cid, incoming_idx);
525
526 Some(DatagramEvent::NewConnection(Incoming {
527 received_at: event.now,
528 addresses,
529 ecn: event.ecn,
530 packet: InitialPacket {
531 header,
532 header_data: packet.header_data,
533 payload: packet.payload,
534 },
535 rest: event.remaining,
536 crypto,
537 token,
538 incoming_idx,
539 improper_drop_warner: IncomingImproperDropWarner,
540 }))
541 }
542
543 pub fn accept(
546 &mut self,
547 mut incoming: Incoming,
548 now: Instant,
549 buf: &mut Vec<u8>,
550 server_config: Option<Arc<ServerConfig>>,
551 ) -> Result<(ConnectionHandle, Connection), Box<AcceptError>> {
552 let remote_address_validated = incoming.remote_address_validated();
553 incoming.improper_drop_warner.dismiss();
554 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
555 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
556
557 let packet_number = incoming.packet.header.number.expand(0);
558 let InitialHeader {
559 src_cid,
560 dst_cid,
561 version,
562 ..
563 } = incoming.packet.header;
564 let server_config =
565 server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
566
567 if server_config
568 .transport
569 .max_idle_timeout
570 .is_some_and(|timeout| {
571 incoming.received_at + Duration::from_millis(timeout.into()) <= now
572 })
573 {
574 debug!("abandoning accept of stale initial");
575 self.index.remove_initial(dst_cid);
576 return Err(Box::new(AcceptError {
577 cause: ConnectionError::TimedOut,
578 response: None,
579 }));
580 }
581
582 if self.cids_exhausted() {
583 debug!("refusing connection");
584 self.index.remove_initial(dst_cid);
585 return Err(Box::new(AcceptError {
586 cause: ConnectionError::CidsExhausted,
587 response: Some(self.initial_close(
588 version,
589 incoming.addresses,
590 &incoming.crypto,
591 src_cid,
592 TransportError::CONNECTION_REFUSED(""),
593 buf,
594 )),
595 }));
596 }
597
598 if incoming
599 .crypto
600 .packet
601 .remote
602 .decrypt(
603 PathId::ZERO,
604 packet_number,
605 &incoming.packet.header_data,
606 &mut incoming.packet.payload,
607 )
608 .is_err()
609 {
610 debug!(packet_number, "failed to authenticate initial packet");
611 self.index.remove_initial(dst_cid);
612 return Err(Box::new(AcceptError {
613 cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
614 response: None,
615 }));
616 };
617
618 let ch = ConnectionHandle(self.connections.vacant_key());
619 let loc_cid = self.new_cid(ch, PathId::ZERO);
620 let mut params = TransportParameters::new(
621 &server_config.transport,
622 &self.config,
623 self.local_cid_generator.as_ref(),
624 loc_cid,
625 Some(&server_config),
626 &mut self.rng,
627 );
628 params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
629 params.original_dst_cid = Some(incoming.token.orig_dst_cid);
630 params.retry_src_cid = incoming.token.retry_src_cid;
631 let mut pref_addr_cid = None;
632 if server_config.has_preferred_address() {
633 let cid = self.new_cid(ch, PathId::ZERO);
634 pref_addr_cid = Some(cid);
635 params.preferred_address = Some(PreferredAddress {
636 address_v4: server_config.preferred_address_v4,
637 address_v6: server_config.preferred_address_v6,
638 connection_id: cid,
639 stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
640 });
641 }
642
643 let tls = server_config.crypto.clone().start_session(version, ¶ms);
644 let transport_config = server_config.transport.clone();
645 let mut conn = self.add_connection(
646 ch,
647 version,
648 dst_cid,
649 loc_cid,
650 src_cid,
651 incoming.addresses,
652 incoming.received_at,
653 tls,
654 transport_config,
655 SideArgs::Server {
656 server_config,
657 pref_addr_cid,
658 path_validated: remote_address_validated,
659 },
660 ¶ms,
661 );
662 self.index.insert_initial(dst_cid, ch);
663
664 match conn.handle_first_packet(
665 incoming.received_at,
666 incoming.addresses.remote,
667 incoming.ecn,
668 packet_number,
669 incoming.packet,
670 incoming.rest,
671 ) {
672 Ok(()) => {
673 trace!(id = ch.0, icid = %dst_cid, "new connection");
674
675 for event in incoming_buffer.datagrams {
676 conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
677 }
678
679 Ok((ch, conn))
680 }
681 Err(e) => {
682 debug!("handshake failed: {}", e);
683 self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
684 let response = match e {
685 ConnectionError::TransportError(ref e) => Some(self.initial_close(
686 version,
687 incoming.addresses,
688 &incoming.crypto,
689 src_cid,
690 e.clone(),
691 buf,
692 )),
693 _ => None,
694 };
695 Err(Box::new(AcceptError { cause: e, response }))
696 }
697 }
698 }
699
700 fn early_validate_first_packet(
702 &mut self,
703 header: &ProtectedInitialHeader,
704 ) -> Result<(), TransportError> {
705 let config = &self.server_config.as_ref().unwrap();
706 if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
707 return Err(TransportError::CONNECTION_REFUSED(""));
708 }
709
710 if header.dst_cid.len() < 8
715 && (header.token_pos.is_empty()
716 || header.dst_cid.len() != self.local_cid_generator.cid_len())
717 {
718 debug!(
719 "rejecting connection due to invalid DCID length {}",
720 header.dst_cid.len()
721 );
722 return Err(TransportError::PROTOCOL_VIOLATION(
723 "invalid destination CID length",
724 ));
725 }
726
727 Ok(())
728 }
729
730 pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
732 self.clean_up_incoming(&incoming);
733 incoming.improper_drop_warner.dismiss();
734
735 self.initial_close(
736 incoming.packet.header.version,
737 incoming.addresses,
738 &incoming.crypto,
739 incoming.packet.header.src_cid,
740 TransportError::CONNECTION_REFUSED(""),
741 buf,
742 )
743 }
744
745 pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
749 if !incoming.may_retry() {
750 return Err(RetryError(Box::new(incoming)));
751 }
752
753 self.clean_up_incoming(&incoming);
754 incoming.improper_drop_warner.dismiss();
755
756 let server_config = self.server_config.as_ref().unwrap();
757
758 let loc_cid = self.local_cid_generator.generate_cid();
765
766 let payload = TokenPayload::Retry {
767 address: incoming.addresses.remote,
768 orig_dst_cid: incoming.packet.header.dst_cid,
769 issued: server_config.time_source.now(),
770 };
771 let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
772
773 let header = Header::Retry {
774 src_cid: loc_cid,
775 dst_cid: incoming.packet.header.src_cid,
776 version: incoming.packet.header.version,
777 };
778
779 let encode = header.encode(buf);
780 buf.put_slice(&token);
781 buf.extend_from_slice(&server_config.crypto.retry_tag(
782 incoming.packet.header.version,
783 incoming.packet.header.dst_cid,
784 buf,
785 ));
786 encode.finish(buf, &*incoming.crypto.header.local, None);
787
788 Ok(Transmit {
789 destination: incoming.addresses.remote,
790 ecn: None,
791 size: buf.len(),
792 segment_size: None,
793 src_ip: incoming.addresses.local_ip,
794 })
795 }
796
797 pub fn ignore(&mut self, incoming: Incoming) {
802 self.clean_up_incoming(&incoming);
803 incoming.improper_drop_warner.dismiss();
804 }
805
806 fn clean_up_incoming(&mut self, incoming: &Incoming) {
808 self.index.remove_initial(incoming.packet.header.dst_cid);
809 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
810 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
811 }
812
813 fn add_connection(
814 &mut self,
815 ch: ConnectionHandle,
816 version: u32,
817 init_cid: ConnectionId,
818 loc_cid: ConnectionId,
819 rem_cid: ConnectionId,
820 addresses: FourTuple,
821 now: Instant,
822 tls: Box<dyn crypto::Session>,
823 transport_config: Arc<TransportConfig>,
824 side_args: SideArgs,
825 params: &TransportParameters,
827 ) -> Connection {
828 let mut rng_seed = [0; 32];
829 self.rng.fill_bytes(&mut rng_seed);
830 let side = side_args.side();
831 let pref_addr_cid = side_args.pref_addr_cid();
832
833 let qlog =
834 transport_config.create_qlog_sink(side_args.side(), addresses.remote, init_cid, now);
835
836 qlog.emit_connection_started(
837 now,
838 loc_cid,
839 rem_cid,
840 addresses.remote,
841 addresses.local_ip,
842 params,
843 );
844
845 let conn = Connection::new(
846 self.config.clone(),
847 transport_config,
848 init_cid,
849 loc_cid,
850 rem_cid,
851 addresses.remote,
852 addresses.local_ip,
853 tls,
854 self.local_cid_generator.as_ref(),
855 now,
856 version,
857 self.allow_mtud,
858 rng_seed,
859 side_args,
860 qlog,
861 );
862
863 let mut path_cids = PathLocalCids::default();
864 path_cids.cids.insert(path_cids.issued, loc_cid);
865 path_cids.issued += 1;
866
867 if let Some(cid) = pref_addr_cid {
868 debug_assert_eq!(path_cids.issued, 1, "preferred address cid seq must be 1");
869 path_cids.cids.insert(path_cids.issued, cid);
870 path_cids.issued += 1;
871 }
872
873 let id = self.connections.insert(ConnectionMeta {
874 init_cid,
875 loc_cids: FxHashMap::from_iter([(PathId::ZERO, path_cids)]),
876 addresses,
877 side,
878 reset_token: Default::default(),
879 });
880 debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
881
882 self.index.insert_conn(addresses, loc_cid, ch, side);
883
884 conn
885 }
886
887 fn initial_close(
888 &mut self,
889 version: u32,
890 addresses: FourTuple,
891 crypto: &Keys,
892 remote_id: ConnectionId,
893 reason: TransportError,
894 buf: &mut Vec<u8>,
895 ) -> Transmit {
896 let local_id = self.local_cid_generator.generate_cid();
900 let number = PacketNumber::U8(0);
901 let header = Header::Initial(InitialHeader {
902 dst_cid: remote_id,
903 src_cid: local_id,
904 number,
905 token: Bytes::new(),
906 version,
907 });
908
909 let partial_encode = header.encode(buf);
910 let max_len =
911 INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
912 frame::Close::from(reason).encode(buf, max_len);
913 buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
914 partial_encode.finish(
915 buf,
916 &*crypto.header.local,
917 Some((0, Default::default(), &*crypto.packet.local)),
918 );
919 Transmit {
920 destination: addresses.remote,
921 ecn: None,
922 size: buf.len(),
923 segment_size: None,
924 src_ip: addresses.local_ip,
925 }
926 }
927
928 pub fn config(&self) -> &EndpointConfig {
930 &self.config
931 }
932
933 pub fn open_connections(&self) -> usize {
935 self.connections.len()
936 }
937
938 pub fn incoming_buffer_bytes(&self) -> u64 {
941 self.all_incoming_buffers_total_bytes
942 }
943
944 #[cfg(test)]
945 pub(crate) fn known_connections(&self) -> usize {
946 let x = self.connections.len();
947 debug_assert_eq!(x, self.index.connection_ids_initial.len());
948 debug_assert!(x >= self.index.connection_reset_tokens.0.len());
950 debug_assert!(x >= self.index.incoming_connection_remotes.len());
952 debug_assert!(x >= self.index.outgoing_connection_remotes.len());
953 x
954 }
955
956 #[cfg(test)]
957 pub(crate) fn known_cids(&self) -> usize {
958 self.index.connection_ids.len()
959 }
960
961 fn cids_exhausted(&self) -> bool {
966 self.local_cid_generator.cid_len() <= 4
967 && self.local_cid_generator.cid_len() != 0
968 && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
969 - self.index.connection_ids.len())
970 < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
971 }
972}
973
974impl fmt::Debug for Endpoint {
975 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
976 fmt.debug_struct("Endpoint")
977 .field("rng", &self.rng)
978 .field("index", &self.index)
979 .field("connections", &self.connections)
980 .field("config", &self.config)
981 .field("server_config", &self.server_config)
982 .field("incoming_buffers.len", &self.incoming_buffers.len())
984 .field(
985 "all_incoming_buffers_total_bytes",
986 &self.all_incoming_buffers_total_bytes,
987 )
988 .finish()
989 }
990}
991
992#[derive(Default)]
994struct IncomingBuffer {
995 datagrams: Vec<DatagramConnectionEvent>,
996 total_bytes: u64,
997}
998
999#[derive(Copy, Clone, Debug)]
1001enum RouteDatagramTo {
1002 Incoming(usize),
1003 Connection(ConnectionHandle, PathId),
1004}
1005
1006#[derive(Default, Debug)]
1008struct ConnectionIndex {
1009 connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1015 connection_ids: FxHashMap<ConnectionId, (ConnectionHandle, PathId)>,
1019 incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1023 outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1032 connection_reset_tokens: ResetTokenTable,
1037}
1038
1039impl ConnectionIndex {
1040 fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1042 if dst_cid.is_empty() {
1043 return;
1044 }
1045 self.connection_ids_initial
1046 .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1047 }
1048
1049 fn remove_initial(&mut self, dst_cid: ConnectionId) {
1051 if dst_cid.is_empty() {
1052 return;
1053 }
1054 let removed = self.connection_ids_initial.remove(&dst_cid);
1055 debug_assert!(removed.is_some());
1056 }
1057
1058 fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1060 if dst_cid.is_empty() {
1061 return;
1062 }
1063 self.connection_ids_initial.insert(
1064 dst_cid,
1065 RouteDatagramTo::Connection(connection, PathId::ZERO),
1066 );
1067 }
1068
1069 fn insert_conn(
1072 &mut self,
1073 addresses: FourTuple,
1074 dst_cid: ConnectionId,
1075 connection: ConnectionHandle,
1076 side: Side,
1077 ) {
1078 match dst_cid.len() {
1079 0 => match side {
1080 Side::Server => {
1081 self.incoming_connection_remotes
1082 .insert(addresses, connection);
1083 }
1084 Side::Client => {
1085 self.outgoing_connection_remotes
1086 .insert(addresses.remote, connection);
1087 }
1088 },
1089 _ => {
1090 self.connection_ids
1091 .insert(dst_cid, (connection, PathId::ZERO));
1092 }
1093 }
1094 }
1095
1096 fn retire(&mut self, dst_cid: ConnectionId) {
1098 self.connection_ids.remove(&dst_cid);
1099 }
1100
1101 fn remove(&mut self, conn: &ConnectionMeta) {
1103 if conn.side.is_server() {
1104 self.remove_initial(conn.init_cid);
1105 }
1106 for cid in conn.loc_cids.values().flat_map(|pcids| pcids.cids.values()) {
1107 self.connection_ids.remove(cid);
1108 }
1109 self.incoming_connection_remotes.remove(&conn.addresses);
1110 self.outgoing_connection_remotes
1111 .remove(&conn.addresses.remote);
1112 for (remote, token) in conn.reset_token.values() {
1113 self.connection_reset_tokens.remove(*remote, *token);
1114 }
1115 }
1116
1117 fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1119 if !datagram.dst_cid().is_empty() {
1120 if let Some(&(ch, path_id)) = self.connection_ids.get(&datagram.dst_cid()) {
1121 return Some(RouteDatagramTo::Connection(ch, path_id));
1122 }
1123 }
1124 if datagram.is_initial() || datagram.is_0rtt() {
1125 if let Some(&ch) = self.connection_ids_initial.get(&datagram.dst_cid()) {
1126 return Some(ch);
1127 }
1128 }
1129 if datagram.dst_cid().is_empty() {
1130 if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1131 return Some(RouteDatagramTo::Connection(ch, PathId::ZERO));
1134 }
1135 if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1136 return Some(RouteDatagramTo::Connection(ch, PathId::ZERO));
1138 }
1139 }
1140 let data = datagram.data();
1141 if data.len() < RESET_TOKEN_SIZE {
1142 return None;
1143 }
1144 self.connection_reset_tokens
1147 .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1148 .cloned()
1149 .map(|ch| RouteDatagramTo::Connection(ch, PathId::ZERO))
1150 }
1151}
1152
1153#[derive(Debug)]
1154pub(crate) struct ConnectionMeta {
1155 init_cid: ConnectionId,
1156 loc_cids: FxHashMap<PathId, PathLocalCids>,
1158 addresses: FourTuple,
1163 side: Side,
1164 reset_token: FxHashMap<PathId, (SocketAddr, ResetToken)>,
1173}
1174
1175#[derive(Debug, Default)]
1177struct PathLocalCids {
1178 issued: u64,
1182 cids: FxHashMap<u64, ConnectionId>,
1184}
1185
1186#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1188pub struct ConnectionHandle(pub usize);
1189
1190impl From<ConnectionHandle> for usize {
1191 fn from(x: ConnectionHandle) -> Self {
1192 x.0
1193 }
1194}
1195
1196impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1197 type Output = ConnectionMeta;
1198 fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1199 &self[ch.0]
1200 }
1201}
1202
1203impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1204 fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1205 &mut self[ch.0]
1206 }
1207}
1208
1209pub enum DatagramEvent {
1211 ConnectionEvent(ConnectionHandle, ConnectionEvent),
1213 NewConnection(Incoming),
1215 Response(Transmit),
1217}
1218
1219pub struct Incoming {
1221 received_at: Instant,
1222 addresses: FourTuple,
1223 ecn: Option<EcnCodepoint>,
1224 packet: InitialPacket,
1225 rest: Option<BytesMut>,
1226 crypto: Keys,
1227 token: IncomingToken,
1228 incoming_idx: usize,
1229 improper_drop_warner: IncomingImproperDropWarner,
1230}
1231
1232impl Incoming {
1233 pub fn local_ip(&self) -> Option<IpAddr> {
1237 self.addresses.local_ip
1238 }
1239
1240 pub fn remote_address(&self) -> SocketAddr {
1242 self.addresses.remote
1243 }
1244
1245 pub fn remote_address_validated(&self) -> bool {
1253 self.token.validated
1254 }
1255
1256 pub fn may_retry(&self) -> bool {
1261 self.token.retry_src_cid.is_none()
1262 }
1263
1264 pub fn orig_dst_cid(&self) -> ConnectionId {
1266 self.token.orig_dst_cid
1267 }
1268}
1269
1270impl fmt::Debug for Incoming {
1271 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1272 f.debug_struct("Incoming")
1273 .field("addresses", &self.addresses)
1274 .field("ecn", &self.ecn)
1275 .field("token", &self.token)
1278 .field("incoming_idx", &self.incoming_idx)
1279 .finish_non_exhaustive()
1281 }
1282}
1283
1284struct IncomingImproperDropWarner;
1285
1286impl IncomingImproperDropWarner {
1287 fn dismiss(self) {
1288 mem::forget(self);
1289 }
1290}
1291
1292impl Drop for IncomingImproperDropWarner {
1293 fn drop(&mut self) {
1294 warn!(
1295 "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1296 (may cause memory leak and eventual inability to accept new connections)"
1297 );
1298 }
1299}
1300
1301#[derive(Debug, Error, Clone, PartialEq, Eq)]
1305pub enum ConnectError {
1306 #[error("endpoint stopping")]
1310 EndpointStopping,
1311 #[error("CIDs exhausted")]
1315 CidsExhausted,
1316 #[error("invalid server name: {0}")]
1318 InvalidServerName(String),
1319 #[error("invalid remote address: {0}")]
1323 InvalidRemoteAddress(SocketAddr),
1324 #[error("no default client config")]
1328 NoDefaultClientConfig,
1329 #[error("unsupported QUIC version")]
1331 UnsupportedVersion,
1332}
1333
1334#[derive(Debug)]
1336pub struct AcceptError {
1337 pub cause: ConnectionError,
1339 pub response: Option<Transmit>,
1341}
1342
1343#[derive(Debug, Error)]
1345#[error("retry() with validated Incoming")]
1346pub struct RetryError(Box<Incoming>);
1347
1348impl RetryError {
1349 pub fn into_incoming(self) -> Incoming {
1351 *self.0
1352 }
1353}
1354
1355#[derive(Default, Debug)]
1360struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1361
1362impl ResetTokenTable {
1363 fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1364 self.0
1365 .entry(remote)
1366 .or_default()
1367 .insert(token, ch)
1368 .is_some()
1369 }
1370
1371 fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1372 use std::collections::hash_map::Entry;
1373 match self.0.entry(remote) {
1374 Entry::Vacant(_) => {}
1375 Entry::Occupied(mut e) => {
1376 e.get_mut().remove(&token);
1377 if e.get().is_empty() {
1378 e.remove_entry();
1379 }
1380 }
1381 }
1382 }
1383
1384 fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1385 let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1386 self.0.get(&remote)?.get(&token)
1387 }
1388}
1389
1390#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1395struct FourTuple {
1396 remote: SocketAddr,
1397 local_ip: Option<IpAddr>,
1399}