iroh_quinn_proto/
endpoint.rs

1use std::{
2    collections::{HashMap, hash_map},
3    convert::TryFrom,
4    fmt, mem,
5    net::{IpAddr, SocketAddr},
6    ops::{Index, IndexMut},
7    sync::Arc,
8};
9
10use bytes::{BufMut, Bytes, BytesMut};
11use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
12use rustc_hash::FxHashMap;
13use slab::Slab;
14use thiserror::Error;
15use tracing::{debug, error, trace, warn};
16
17use crate::{
18    Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, PathId, RESET_TOKEN_SIZE,
19    ResetToken, Side, Transmit, TransportConfig, TransportError,
20    cid_generator::ConnectionIdGenerator,
21    coding::BufMutExt,
22    config::{ClientConfig, EndpointConfig, ServerConfig},
23    connection::{Connection, ConnectionError, SideArgs},
24    crypto::{self, Keys, UnsupportedVersion},
25    frame,
26    packet::{
27        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
28        PacketNumber, PartialDecode, ProtectedInitialHeader,
29    },
30    shared::{
31        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
32        EndpointEvent, EndpointEventInner, IssuedCid,
33    },
34    token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
35    transport_parameters::{PreferredAddress, TransportParameters},
36};
37
38/// The main entry point to the library
39///
40/// This object performs no I/O whatsoever. Instead, it consumes incoming packets and
41/// connection-generated events via `handle` and `handle_event`.
42pub struct Endpoint {
43    rng: StdRng,
44    index: ConnectionIndex,
45    connections: Slab<ConnectionMeta>,
46    local_cid_generator: Box<dyn ConnectionIdGenerator>,
47    config: Arc<EndpointConfig>,
48    server_config: Option<Arc<ServerConfig>>,
49    /// Whether the underlying UDP socket promises not to fragment packets
50    allow_mtud: bool,
51    /// Time at which a stateless reset was most recently sent
52    last_stateless_reset: Option<Instant>,
53    /// Buffered Initial and 0-RTT messages for pending incoming connections
54    incoming_buffers: Slab<IncomingBuffer>,
55    all_incoming_buffers_total_bytes: u64,
56}
57
58impl Endpoint {
59    /// Create a new endpoint
60    ///
61    /// `allow_mtud` enables path MTU detection when requested by `Connection` configuration for
62    /// better performance. This requires that outgoing packets are never fragmented, which can be
63    /// achieved via e.g. the `IPV6_DONTFRAG` socket option.
64    ///
65    /// If `rng_seed` is provided, it will be used to initialize the endpoint's rng (having priority
66    /// over the rng seed configured in [`EndpointConfig`]). Note that the `rng_seed` parameter will
67    /// be removed in a future release, so prefer setting it to `None` and configuring rng seeds
68    /// using [`EndpointConfig::rng_seed`].
69    pub fn new(
70        config: Arc<EndpointConfig>,
71        server_config: Option<Arc<ServerConfig>>,
72        allow_mtud: bool,
73        rng_seed: Option<[u8; 32]>,
74    ) -> Self {
75        let rng_seed = rng_seed.or(config.rng_seed);
76        Self {
77            rng: rng_seed.map_or(StdRng::from_os_rng(), StdRng::from_seed),
78            index: ConnectionIndex::default(),
79            connections: Slab::new(),
80            local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
81            config,
82            server_config,
83            allow_mtud,
84            last_stateless_reset: None,
85            incoming_buffers: Slab::new(),
86            all_incoming_buffers_total_bytes: 0,
87        }
88    }
89
90    /// Replace the server configuration, affecting new incoming connections only
91    pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
92        self.server_config = server_config;
93    }
94
95    /// Process `EndpointEvent`s emitted from related `Connection`s
96    ///
97    /// In turn, processing this event may return a `ConnectionEvent` for the same `Connection`.
98    pub fn handle_event(
99        &mut self,
100        ch: ConnectionHandle,
101        event: EndpointEvent,
102    ) -> Option<ConnectionEvent> {
103        use EndpointEventInner::*;
104        match event.0 {
105            NeedIdentifiers(path_id, now, n) => {
106                return Some(self.send_new_identifiers(path_id, now, ch, n));
107            }
108            ResetToken(path_id, remote, token) => {
109                if let Some(old) = self.connections[ch]
110                    .reset_token
111                    .insert(path_id, (remote, token))
112                {
113                    self.index.connection_reset_tokens.remove(old.0, old.1);
114                }
115                if self.index.connection_reset_tokens.insert(remote, token, ch) {
116                    warn!("duplicate reset token");
117                }
118            }
119            RetireResetToken(path_id) => {
120                if let Some(old) = self.connections[ch].reset_token.remove(&path_id) {
121                    self.index.connection_reset_tokens.remove(old.0, old.1);
122                }
123            }
124            RetireConnectionId(now, path_id, seq, allow_more_cids) => {
125                if let Some(cid) = self.connections[ch]
126                    .loc_cids
127                    .get_mut(&path_id)
128                    .and_then(|pcid| pcid.cids.remove(&seq))
129                {
130                    trace!(%path_id, "local CID retired {}: {}", seq, cid);
131                    self.index.retire(cid);
132                    if allow_more_cids {
133                        return Some(self.send_new_identifiers(path_id, now, ch, 1));
134                    }
135                }
136            }
137            Drained => {
138                if let Some(conn) = self.connections.try_remove(ch.0) {
139                    self.index.remove(&conn);
140                } else {
141                    // This indicates a bug in downstream code, which could cause spurious
142                    // connection loss instead of this error if the CID was (re)allocated prior to
143                    // the illegal call.
144                    error!(id = ch.0, "unknown connection drained");
145                }
146            }
147        }
148        None
149    }
150
151    /// Process an incoming UDP datagram
152    pub fn handle(
153        &mut self,
154        now: Instant,
155        remote: SocketAddr,
156        local_ip: Option<IpAddr>,
157        ecn: Option<EcnCodepoint>,
158        data: BytesMut,
159        buf: &mut Vec<u8>,
160    ) -> Option<DatagramEvent> {
161        // Partially decode packet or short-circuit if unable
162        let datagram_len = data.len();
163        let mut event = match PartialDecode::new(
164            data,
165            &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
166            &self.config.supported_versions,
167            self.config.grease_quic_bit,
168        ) {
169            Ok((first_decode, remaining)) => DatagramConnectionEvent {
170                now,
171                remote,
172                path_id: PathId::ZERO, // Corrected later for existing paths
173                ecn,
174                first_decode,
175                remaining,
176            },
177            Err(PacketDecodeError::UnsupportedVersion {
178                src_cid,
179                dst_cid,
180                version,
181            }) => {
182                if self.server_config.is_none() {
183                    debug!("dropping packet with unsupported version");
184                    return None;
185                }
186                trace!("sending version negotiation");
187                // Negotiate versions
188                Header::VersionNegotiate {
189                    random: self.rng.random::<u8>() | 0x40,
190                    src_cid: dst_cid,
191                    dst_cid: src_cid,
192                }
193                .encode(buf);
194                // Grease with a reserved version
195                buf.write::<u32>(match version {
196                    0x0a1a_2a3a => 0x0a1a_2a4a,
197                    _ => 0x0a1a_2a3a,
198                });
199                for &version in &self.config.supported_versions {
200                    buf.write(version);
201                }
202                return Some(DatagramEvent::Response(Transmit {
203                    destination: remote,
204                    ecn: None,
205                    size: buf.len(),
206                    segment_size: None,
207                    src_ip: local_ip,
208                }));
209            }
210            Err(e) => {
211                trace!("malformed header: {}", e);
212                return None;
213            }
214        };
215
216        let addresses = FourTuple { remote, local_ip };
217        let dst_cid = event.first_decode.dst_cid();
218
219        if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
220            event.path_id = match route_to {
221                RouteDatagramTo::Incoming(_) => PathId::ZERO,
222                RouteDatagramTo::Connection(_, path_id) => path_id,
223            };
224            match route_to {
225                RouteDatagramTo::Incoming(incoming_idx) => {
226                    let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
227                    let config = &self.server_config.as_ref().unwrap();
228
229                    if incoming_buffer
230                        .total_bytes
231                        .checked_add(datagram_len as u64)
232                        .is_some_and(|n| n <= config.incoming_buffer_size)
233                        && self
234                            .all_incoming_buffers_total_bytes
235                            .checked_add(datagram_len as u64)
236                            .is_some_and(|n| n <= config.incoming_buffer_size_total)
237                    {
238                        incoming_buffer.datagrams.push(event);
239                        incoming_buffer.total_bytes += datagram_len as u64;
240                        self.all_incoming_buffers_total_bytes += datagram_len as u64;
241                    }
242
243                    None
244                }
245                RouteDatagramTo::Connection(ch, _path_id) => Some(DatagramEvent::ConnectionEvent(
246                    ch,
247                    ConnectionEvent(ConnectionEventInner::Datagram(event)),
248                )),
249            }
250        } else if event.first_decode.initial_header().is_some() {
251            // Potentially create a new connection
252
253            self.handle_first_packet(datagram_len, event, addresses, buf)
254        } else if event.first_decode.has_long_header() {
255            debug!(
256                "ignoring non-initial packet for unknown connection {}",
257                dst_cid
258            );
259            None
260        } else if !event.first_decode.is_initial()
261            && self.local_cid_generator.validate(dst_cid).is_err()
262        {
263            debug!("dropping packet with invalid CID");
264            None
265        } else if dst_cid.is_empty() {
266            trace!("dropping unrecognized short packet without ID");
267            None
268        } else {
269            // If we got this far, we're receiving a seemingly valid packet for an unknown
270            // connection. Send a stateless reset if possible.
271            self.stateless_reset(now, datagram_len, addresses, dst_cid, buf)
272                .map(DatagramEvent::Response)
273        }
274    }
275
276    /// Builds a stateless reset packet to respond with
277    fn stateless_reset(
278        &mut self,
279        now: Instant,
280        inciting_dgram_len: usize,
281        addresses: FourTuple,
282        dst_cid: ConnectionId,
283        buf: &mut Vec<u8>,
284    ) -> Option<Transmit> {
285        if self
286            .last_stateless_reset
287            .is_some_and(|last| last + self.config.min_reset_interval > now)
288        {
289            debug!("ignoring unexpected packet within minimum stateless reset interval");
290            return None;
291        }
292
293        /// Minimum amount of padding for the stateless reset to look like a short-header packet
294        const MIN_PADDING_LEN: usize = 5;
295
296        // Prevent amplification attacks and reset loops by ensuring we pad to at most 1 byte
297        // smaller than the inciting packet.
298        let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
299            Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
300            _ => {
301                debug!(
302                    "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
303                    inciting_dgram_len
304                );
305                return None;
306            }
307        };
308
309        debug!(
310            "sending stateless reset for {} to {}",
311            dst_cid, addresses.remote
312        );
313        self.last_stateless_reset = Some(now);
314        // Resets with at least this much padding can't possibly be distinguished from real packets
315        const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
316        let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
317            max_padding_len
318        } else {
319            self.rng
320                .random_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
321        };
322        buf.reserve(padding_len + RESET_TOKEN_SIZE);
323        buf.resize(padding_len, 0);
324        self.rng.fill_bytes(&mut buf[0..padding_len]);
325        buf[0] = 0b0100_0000 | (buf[0] >> 2);
326        buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
327
328        debug_assert!(buf.len() < inciting_dgram_len);
329
330        Some(Transmit {
331            destination: addresses.remote,
332            ecn: None,
333            size: buf.len(),
334            segment_size: None,
335            src_ip: addresses.local_ip,
336        })
337    }
338
339    /// Initiate a connection
340    pub fn connect(
341        &mut self,
342        now: Instant,
343        config: ClientConfig,
344        remote: SocketAddr,
345        server_name: &str,
346    ) -> Result<(ConnectionHandle, Connection), ConnectError> {
347        if self.cids_exhausted() {
348            return Err(ConnectError::CidsExhausted);
349        }
350        if remote.port() == 0 || remote.ip().is_unspecified() {
351            return Err(ConnectError::InvalidRemoteAddress(remote));
352        }
353        if !self.config.supported_versions.contains(&config.version) {
354            return Err(ConnectError::UnsupportedVersion);
355        }
356
357        let remote_id = (config.initial_dst_cid_provider)();
358        trace!(initial_dcid = %remote_id);
359
360        let ch = ConnectionHandle(self.connections.vacant_key());
361        let loc_cid = self.new_cid(ch, PathId::ZERO);
362        let params = TransportParameters::new(
363            &config.transport,
364            &self.config,
365            self.local_cid_generator.as_ref(),
366            loc_cid,
367            None,
368            &mut self.rng,
369        );
370        let tls = config
371            .crypto
372            .start_session(config.version, server_name, &params)?;
373
374        let conn = self.add_connection(
375            ch,
376            config.version,
377            remote_id,
378            loc_cid,
379            remote_id,
380            FourTuple {
381                remote,
382                local_ip: None,
383            },
384            now,
385            tls,
386            config.transport,
387            SideArgs::Client {
388                token_store: config.token_store,
389                server_name: server_name.into(),
390            },
391            &params,
392        );
393        Ok((ch, conn))
394    }
395
396    /// Generates new CIDs and creates message to send to the connection state
397    fn send_new_identifiers(
398        &mut self,
399        path_id: PathId,
400        now: Instant,
401        ch: ConnectionHandle,
402        num: u64,
403    ) -> ConnectionEvent {
404        let mut ids = vec![];
405        for _ in 0..num {
406            let id = self.new_cid(ch, path_id);
407            let cid_meta = self.connections[ch].loc_cids.entry(path_id).or_default();
408            let sequence = cid_meta.issued;
409            cid_meta.issued += 1;
410            cid_meta.cids.insert(sequence, id);
411            ids.push(IssuedCid {
412                path_id,
413                sequence,
414                id,
415                reset_token: ResetToken::new(&*self.config.reset_key, id),
416            });
417        }
418        ConnectionEvent(ConnectionEventInner::NewIdentifiers(
419            ids,
420            now,
421            self.local_cid_generator.cid_len(),
422            self.local_cid_generator.cid_lifetime(),
423        ))
424    }
425
426    /// Generate a connection ID for `ch`
427    fn new_cid(&mut self, ch: ConnectionHandle, path_id: PathId) -> ConnectionId {
428        loop {
429            let cid = self.local_cid_generator.generate_cid();
430            if cid.is_empty() {
431                // Zero-length CID; nothing to track
432                debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
433                return cid;
434            }
435            if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
436                e.insert((ch, path_id));
437                break cid;
438            }
439        }
440    }
441
442    fn handle_first_packet(
443        &mut self,
444        datagram_len: usize,
445        event: DatagramConnectionEvent,
446        addresses: FourTuple,
447        buf: &mut Vec<u8>,
448    ) -> Option<DatagramEvent> {
449        let dst_cid = event.first_decode.dst_cid();
450        let header = event.first_decode.initial_header().unwrap();
451
452        let Some(server_config) = &self.server_config else {
453            debug!("packet for unrecognized connection {}", dst_cid);
454            return self
455                .stateless_reset(event.now, datagram_len, addresses, dst_cid, buf)
456                .map(DatagramEvent::Response);
457        };
458
459        if datagram_len < MIN_INITIAL_SIZE as usize {
460            debug!("ignoring short initial for connection {}", dst_cid);
461            return None;
462        }
463
464        let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
465            Ok(keys) => keys,
466            Err(UnsupportedVersion) => {
467                // This probably indicates that the user set supported_versions incorrectly in
468                // `EndpointConfig`.
469                debug!(
470                    "ignoring initial packet version {:#x} unsupported by cryptographic layer",
471                    header.version
472                );
473                return None;
474            }
475        };
476
477        if let Err(reason) = self.early_validate_first_packet(header) {
478            return Some(DatagramEvent::Response(self.initial_close(
479                header.version,
480                addresses,
481                &crypto,
482                header.src_cid,
483                reason,
484                buf,
485            )));
486        }
487
488        let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
489            Ok(packet) => packet,
490            Err(e) => {
491                trace!("unable to decode initial packet: {}", e);
492                return None;
493            }
494        };
495
496        if !packet.reserved_bits_valid() {
497            debug!("dropping connection attempt with invalid reserved bits");
498            return None;
499        }
500
501        let Header::Initial(header) = packet.header else {
502            panic!("non-initial packet in handle_first_packet()");
503        };
504
505        let server_config = self.server_config.as_ref().unwrap().clone();
506
507        let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
508            Ok(token) => token,
509            Err(InvalidRetryTokenError) => {
510                debug!("rejecting invalid retry token");
511                return Some(DatagramEvent::Response(self.initial_close(
512                    header.version,
513                    addresses,
514                    &crypto,
515                    header.src_cid,
516                    TransportError::INVALID_TOKEN(""),
517                    buf,
518                )));
519            }
520        };
521
522        let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
523        self.index
524            .insert_initial_incoming(header.dst_cid, incoming_idx);
525
526        Some(DatagramEvent::NewConnection(Incoming {
527            received_at: event.now,
528            addresses,
529            ecn: event.ecn,
530            packet: InitialPacket {
531                header,
532                header_data: packet.header_data,
533                payload: packet.payload,
534            },
535            rest: event.remaining,
536            crypto,
537            token,
538            incoming_idx,
539            improper_drop_warner: IncomingImproperDropWarner,
540        }))
541    }
542
543    /// Attempt to accept this incoming connection (an error may still occur)
544    // box err to avoid clippy::result_large_err
545    pub fn accept(
546        &mut self,
547        mut incoming: Incoming,
548        now: Instant,
549        buf: &mut Vec<u8>,
550        server_config: Option<Arc<ServerConfig>>,
551    ) -> Result<(ConnectionHandle, Connection), Box<AcceptError>> {
552        let remote_address_validated = incoming.remote_address_validated();
553        incoming.improper_drop_warner.dismiss();
554        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
555        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
556
557        let packet_number = incoming.packet.header.number.expand(0);
558        let InitialHeader {
559            src_cid,
560            dst_cid,
561            version,
562            ..
563        } = incoming.packet.header;
564        let server_config =
565            server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
566
567        if server_config
568            .transport
569            .max_idle_timeout
570            .is_some_and(|timeout| {
571                incoming.received_at + Duration::from_millis(timeout.into()) <= now
572            })
573        {
574            debug!("abandoning accept of stale initial");
575            self.index.remove_initial(dst_cid);
576            return Err(Box::new(AcceptError {
577                cause: ConnectionError::TimedOut,
578                response: None,
579            }));
580        }
581
582        if self.cids_exhausted() {
583            debug!("refusing connection");
584            self.index.remove_initial(dst_cid);
585            return Err(Box::new(AcceptError {
586                cause: ConnectionError::CidsExhausted,
587                response: Some(self.initial_close(
588                    version,
589                    incoming.addresses,
590                    &incoming.crypto,
591                    src_cid,
592                    TransportError::CONNECTION_REFUSED(""),
593                    buf,
594                )),
595            }));
596        }
597
598        if incoming
599            .crypto
600            .packet
601            .remote
602            .decrypt(
603                PathId::ZERO,
604                packet_number,
605                &incoming.packet.header_data,
606                &mut incoming.packet.payload,
607            )
608            .is_err()
609        {
610            debug!(packet_number, "failed to authenticate initial packet");
611            self.index.remove_initial(dst_cid);
612            return Err(Box::new(AcceptError {
613                cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
614                response: None,
615            }));
616        };
617
618        let ch = ConnectionHandle(self.connections.vacant_key());
619        let loc_cid = self.new_cid(ch, PathId::ZERO);
620        let mut params = TransportParameters::new(
621            &server_config.transport,
622            &self.config,
623            self.local_cid_generator.as_ref(),
624            loc_cid,
625            Some(&server_config),
626            &mut self.rng,
627        );
628        params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
629        params.original_dst_cid = Some(incoming.token.orig_dst_cid);
630        params.retry_src_cid = incoming.token.retry_src_cid;
631        let mut pref_addr_cid = None;
632        if server_config.has_preferred_address() {
633            let cid = self.new_cid(ch, PathId::ZERO);
634            pref_addr_cid = Some(cid);
635            params.preferred_address = Some(PreferredAddress {
636                address_v4: server_config.preferred_address_v4,
637                address_v6: server_config.preferred_address_v6,
638                connection_id: cid,
639                stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
640            });
641        }
642
643        let tls = server_config.crypto.clone().start_session(version, &params);
644        let transport_config = server_config.transport.clone();
645        let mut conn = self.add_connection(
646            ch,
647            version,
648            dst_cid,
649            loc_cid,
650            src_cid,
651            incoming.addresses,
652            incoming.received_at,
653            tls,
654            transport_config,
655            SideArgs::Server {
656                server_config,
657                pref_addr_cid,
658                path_validated: remote_address_validated,
659            },
660            &params,
661        );
662        self.index.insert_initial(dst_cid, ch);
663
664        match conn.handle_first_packet(
665            incoming.received_at,
666            incoming.addresses.remote,
667            incoming.ecn,
668            packet_number,
669            incoming.packet,
670            incoming.rest,
671        ) {
672            Ok(()) => {
673                trace!(id = ch.0, icid = %dst_cid, "new connection");
674
675                for event in incoming_buffer.datagrams {
676                    conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
677                }
678
679                Ok((ch, conn))
680            }
681            Err(e) => {
682                debug!("handshake failed: {}", e);
683                self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
684                let response = match e {
685                    ConnectionError::TransportError(ref e) => Some(self.initial_close(
686                        version,
687                        incoming.addresses,
688                        &incoming.crypto,
689                        src_cid,
690                        e.clone(),
691                        buf,
692                    )),
693                    _ => None,
694                };
695                Err(Box::new(AcceptError { cause: e, response }))
696            }
697        }
698    }
699
700    /// Check if we should refuse a connection attempt regardless of the packet's contents
701    fn early_validate_first_packet(
702        &mut self,
703        header: &ProtectedInitialHeader,
704    ) -> Result<(), TransportError> {
705        let config = &self.server_config.as_ref().unwrap();
706        if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
707            return Err(TransportError::CONNECTION_REFUSED(""));
708        }
709
710        // RFC9000 §7.2 dictates that initial (client-chosen) destination CIDs must be at least 8
711        // bytes. If this is a Retry packet, then the length must instead match our usual CID
712        // length. If we ever issue non-Retry address validation tokens via `NEW_TOKEN`, then we'll
713        // also need to validate CID length for those after decoding the token.
714        if header.dst_cid.len() < 8
715            && (header.token_pos.is_empty()
716                || header.dst_cid.len() != self.local_cid_generator.cid_len())
717        {
718            debug!(
719                "rejecting connection due to invalid DCID length {}",
720                header.dst_cid.len()
721            );
722            return Err(TransportError::PROTOCOL_VIOLATION(
723                "invalid destination CID length",
724            ));
725        }
726
727        Ok(())
728    }
729
730    /// Reject this incoming connection attempt
731    pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
732        self.clean_up_incoming(&incoming);
733        incoming.improper_drop_warner.dismiss();
734
735        self.initial_close(
736            incoming.packet.header.version,
737            incoming.addresses,
738            &incoming.crypto,
739            incoming.packet.header.src_cid,
740            TransportError::CONNECTION_REFUSED(""),
741            buf,
742        )
743    }
744
745    /// Respond with a retry packet, requiring the client to retry with address validation
746    ///
747    /// Errors if `incoming.may_retry()` is false.
748    pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
749        if !incoming.may_retry() {
750            return Err(RetryError(Box::new(incoming)));
751        }
752
753        self.clean_up_incoming(&incoming);
754        incoming.improper_drop_warner.dismiss();
755
756        let server_config = self.server_config.as_ref().unwrap();
757
758        // First Initial
759        // The peer will use this as the DCID of its following Initials. Initial DCIDs are
760        // looked up separately from Handshake/Data DCIDs, so there is no risk of collision
761        // with established connections. In the unlikely event that a collision occurs
762        // between two connections in the initial phase, both will fail fast and may be
763        // retried by the application layer.
764        let loc_cid = self.local_cid_generator.generate_cid();
765
766        let payload = TokenPayload::Retry {
767            address: incoming.addresses.remote,
768            orig_dst_cid: incoming.packet.header.dst_cid,
769            issued: server_config.time_source.now(),
770        };
771        let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
772
773        let header = Header::Retry {
774            src_cid: loc_cid,
775            dst_cid: incoming.packet.header.src_cid,
776            version: incoming.packet.header.version,
777        };
778
779        let encode = header.encode(buf);
780        buf.put_slice(&token);
781        buf.extend_from_slice(&server_config.crypto.retry_tag(
782            incoming.packet.header.version,
783            incoming.packet.header.dst_cid,
784            buf,
785        ));
786        encode.finish(buf, &*incoming.crypto.header.local, None);
787
788        Ok(Transmit {
789            destination: incoming.addresses.remote,
790            ecn: None,
791            size: buf.len(),
792            segment_size: None,
793            src_ip: incoming.addresses.local_ip,
794        })
795    }
796
797    /// Ignore this incoming connection attempt, not sending any packet in response
798    ///
799    /// Doing this actively, rather than merely dropping the [`Incoming`], is necessary to prevent
800    /// memory leaks due to state within [`Endpoint`] tracking the incoming connection.
801    pub fn ignore(&mut self, incoming: Incoming) {
802        self.clean_up_incoming(&incoming);
803        incoming.improper_drop_warner.dismiss();
804    }
805
806    /// Clean up endpoint data structures associated with an `Incoming`.
807    fn clean_up_incoming(&mut self, incoming: &Incoming) {
808        self.index.remove_initial(incoming.packet.header.dst_cid);
809        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
810        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
811    }
812
813    fn add_connection(
814        &mut self,
815        ch: ConnectionHandle,
816        version: u32,
817        init_cid: ConnectionId,
818        loc_cid: ConnectionId,
819        rem_cid: ConnectionId,
820        addresses: FourTuple,
821        now: Instant,
822        tls: Box<dyn crypto::Session>,
823        transport_config: Arc<TransportConfig>,
824        side_args: SideArgs,
825        // Only used for qlog.
826        params: &TransportParameters,
827    ) -> Connection {
828        let mut rng_seed = [0; 32];
829        self.rng.fill_bytes(&mut rng_seed);
830        let side = side_args.side();
831        let pref_addr_cid = side_args.pref_addr_cid();
832
833        let qlog =
834            transport_config.create_qlog_sink(side_args.side(), addresses.remote, init_cid, now);
835
836        qlog.emit_connection_started(
837            now,
838            loc_cid,
839            rem_cid,
840            addresses.remote,
841            addresses.local_ip,
842            params,
843        );
844
845        let conn = Connection::new(
846            self.config.clone(),
847            transport_config,
848            init_cid,
849            loc_cid,
850            rem_cid,
851            addresses.remote,
852            addresses.local_ip,
853            tls,
854            self.local_cid_generator.as_ref(),
855            now,
856            version,
857            self.allow_mtud,
858            rng_seed,
859            side_args,
860            qlog,
861        );
862
863        let mut path_cids = PathLocalCids::default();
864        path_cids.cids.insert(path_cids.issued, loc_cid);
865        path_cids.issued += 1;
866
867        if let Some(cid) = pref_addr_cid {
868            debug_assert_eq!(path_cids.issued, 1, "preferred address cid seq must be 1");
869            path_cids.cids.insert(path_cids.issued, cid);
870            path_cids.issued += 1;
871        }
872
873        let id = self.connections.insert(ConnectionMeta {
874            init_cid,
875            loc_cids: FxHashMap::from_iter([(PathId::ZERO, path_cids)]),
876            addresses,
877            side,
878            reset_token: Default::default(),
879        });
880        debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
881
882        self.index.insert_conn(addresses, loc_cid, ch, side);
883
884        conn
885    }
886
887    fn initial_close(
888        &mut self,
889        version: u32,
890        addresses: FourTuple,
891        crypto: &Keys,
892        remote_id: ConnectionId,
893        reason: TransportError,
894        buf: &mut Vec<u8>,
895    ) -> Transmit {
896        // We don't need to worry about CID collisions in initial closes because the peer
897        // shouldn't respond, and if it does, and the CID collides, we'll just drop the
898        // unexpected response.
899        let local_id = self.local_cid_generator.generate_cid();
900        let number = PacketNumber::U8(0);
901        let header = Header::Initial(InitialHeader {
902            dst_cid: remote_id,
903            src_cid: local_id,
904            number,
905            token: Bytes::new(),
906            version,
907        });
908
909        let partial_encode = header.encode(buf);
910        let max_len =
911            INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
912        frame::Close::from(reason).encode(buf, max_len);
913        buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
914        partial_encode.finish(
915            buf,
916            &*crypto.header.local,
917            Some((0, Default::default(), &*crypto.packet.local)),
918        );
919        Transmit {
920            destination: addresses.remote,
921            ecn: None,
922            size: buf.len(),
923            segment_size: None,
924            src_ip: addresses.local_ip,
925        }
926    }
927
928    /// Access the configuration used by this endpoint
929    pub fn config(&self) -> &EndpointConfig {
930        &self.config
931    }
932
933    /// Number of connections that are currently open
934    pub fn open_connections(&self) -> usize {
935        self.connections.len()
936    }
937
938    /// Counter for the number of bytes currently used
939    /// in the buffers for Initial and 0-RTT messages for pending incoming connections
940    pub fn incoming_buffer_bytes(&self) -> u64 {
941        self.all_incoming_buffers_total_bytes
942    }
943
944    #[cfg(test)]
945    pub(crate) fn known_connections(&self) -> usize {
946        let x = self.connections.len();
947        debug_assert_eq!(x, self.index.connection_ids_initial.len());
948        // Not all connections have known reset tokens
949        debug_assert!(x >= self.index.connection_reset_tokens.0.len());
950        // Not all connections have unique remotes, and 0-length CIDs might not be in use.
951        debug_assert!(x >= self.index.incoming_connection_remotes.len());
952        debug_assert!(x >= self.index.outgoing_connection_remotes.len());
953        x
954    }
955
956    #[cfg(test)]
957    pub(crate) fn known_cids(&self) -> usize {
958        self.index.connection_ids.len()
959    }
960
961    /// Whether we've used up 3/4 of the available CID space
962    ///
963    /// We leave some space unused so that `new_cid` can be relied upon to finish quickly. We don't
964    /// bother to check when CID longer than 4 bytes are used because 2^40 connections is a lot.
965    fn cids_exhausted(&self) -> bool {
966        self.local_cid_generator.cid_len() <= 4
967            && self.local_cid_generator.cid_len() != 0
968            && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
969                - self.index.connection_ids.len())
970                < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
971    }
972}
973
974impl fmt::Debug for Endpoint {
975    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
976        fmt.debug_struct("Endpoint")
977            .field("rng", &self.rng)
978            .field("index", &self.index)
979            .field("connections", &self.connections)
980            .field("config", &self.config)
981            .field("server_config", &self.server_config)
982            // incoming_buffers too large
983            .field("incoming_buffers.len", &self.incoming_buffers.len())
984            .field(
985                "all_incoming_buffers_total_bytes",
986                &self.all_incoming_buffers_total_bytes,
987            )
988            .finish()
989    }
990}
991
992/// Buffered Initial and 0-RTT messages for a pending incoming connection
993#[derive(Default)]
994struct IncomingBuffer {
995    datagrams: Vec<DatagramConnectionEvent>,
996    total_bytes: u64,
997}
998
999/// Part of protocol state incoming datagrams can be routed to
1000#[derive(Copy, Clone, Debug)]
1001enum RouteDatagramTo {
1002    Incoming(usize),
1003    Connection(ConnectionHandle, PathId),
1004}
1005
1006/// Maps packets to existing connections
1007#[derive(Default, Debug)]
1008struct ConnectionIndex {
1009    /// Identifies connections based on the initial DCID the peer utilized
1010    ///
1011    /// Uses a standard `HashMap` to protect against hash collision attacks.
1012    ///
1013    /// Used by the server, not the client.
1014    connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1015    /// Identifies connections based on locally created CIDs
1016    ///
1017    /// Uses a cheaper hash function since keys are locally created
1018    connection_ids: FxHashMap<ConnectionId, (ConnectionHandle, PathId)>,
1019    /// Identifies incoming connections with zero-length CIDs
1020    ///
1021    /// Uses a standard `HashMap` to protect against hash collision attacks.
1022    incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1023    /// Identifies outgoing connections with zero-length CIDs
1024    ///
1025    /// We don't yet support explicit source addresses for client connections, and zero-length CIDs
1026    /// require a unique four-tuple, so at most one client connection with zero-length local CIDs
1027    /// may be established per remote. We must omit the local address from the key because we don't
1028    /// necessarily know what address we're sending from, and hence receiving at.
1029    ///
1030    /// Uses a standard `HashMap` to protect against hash collision attacks.
1031    outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1032    /// Reset tokens provided by the peer for the CID each connection is currently sending to
1033    ///
1034    /// Incoming stateless resets do not have correct CIDs, so we need this to identify the correct
1035    /// recipient, if any.
1036    connection_reset_tokens: ResetTokenTable,
1037}
1038
1039impl ConnectionIndex {
1040    /// Associate an incoming connection with its initial destination CID
1041    fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1042        if dst_cid.is_empty() {
1043            return;
1044        }
1045        self.connection_ids_initial
1046            .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1047    }
1048
1049    /// Remove an association with an initial destination CID
1050    fn remove_initial(&mut self, dst_cid: ConnectionId) {
1051        if dst_cid.is_empty() {
1052            return;
1053        }
1054        let removed = self.connection_ids_initial.remove(&dst_cid);
1055        debug_assert!(removed.is_some());
1056    }
1057
1058    /// Associate a connection with its initial destination CID
1059    fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1060        if dst_cid.is_empty() {
1061            return;
1062        }
1063        self.connection_ids_initial.insert(
1064            dst_cid,
1065            RouteDatagramTo::Connection(connection, PathId::ZERO),
1066        );
1067    }
1068
1069    /// Associate a connection with its first locally-chosen destination CID if used, or otherwise
1070    /// its current 4-tuple
1071    fn insert_conn(
1072        &mut self,
1073        addresses: FourTuple,
1074        dst_cid: ConnectionId,
1075        connection: ConnectionHandle,
1076        side: Side,
1077    ) {
1078        match dst_cid.len() {
1079            0 => match side {
1080                Side::Server => {
1081                    self.incoming_connection_remotes
1082                        .insert(addresses, connection);
1083                }
1084                Side::Client => {
1085                    self.outgoing_connection_remotes
1086                        .insert(addresses.remote, connection);
1087                }
1088            },
1089            _ => {
1090                self.connection_ids
1091                    .insert(dst_cid, (connection, PathId::ZERO));
1092            }
1093        }
1094    }
1095
1096    /// Discard a connection ID
1097    fn retire(&mut self, dst_cid: ConnectionId) {
1098        self.connection_ids.remove(&dst_cid);
1099    }
1100
1101    /// Remove all references to a connection
1102    fn remove(&mut self, conn: &ConnectionMeta) {
1103        if conn.side.is_server() {
1104            self.remove_initial(conn.init_cid);
1105        }
1106        for cid in conn.loc_cids.values().flat_map(|pcids| pcids.cids.values()) {
1107            self.connection_ids.remove(cid);
1108        }
1109        self.incoming_connection_remotes.remove(&conn.addresses);
1110        self.outgoing_connection_remotes
1111            .remove(&conn.addresses.remote);
1112        for (remote, token) in conn.reset_token.values() {
1113            self.connection_reset_tokens.remove(*remote, *token);
1114        }
1115    }
1116
1117    /// Find the existing connection that `datagram` should be routed to, if any
1118    fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1119        if !datagram.dst_cid().is_empty() {
1120            if let Some(&(ch, path_id)) = self.connection_ids.get(&datagram.dst_cid()) {
1121                return Some(RouteDatagramTo::Connection(ch, path_id));
1122            }
1123        }
1124        if datagram.is_initial() || datagram.is_0rtt() {
1125            if let Some(&ch) = self.connection_ids_initial.get(&datagram.dst_cid()) {
1126                return Some(ch);
1127            }
1128        }
1129        if datagram.dst_cid().is_empty() {
1130            if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1131                // Never multipath because QUIC-MULTIPATH 1.1 mandates the use of non-zero
1132                // length CIDs.  So this is always PathId::ZERO.
1133                return Some(RouteDatagramTo::Connection(ch, PathId::ZERO));
1134            }
1135            if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1136                // Like above, QUIC-MULTIPATH 1.1 mandates the use of non-zero length CIDs.
1137                return Some(RouteDatagramTo::Connection(ch, PathId::ZERO));
1138            }
1139        }
1140        let data = datagram.data();
1141        if data.len() < RESET_TOKEN_SIZE {
1142            return None;
1143        }
1144        // For stateless resets the PathId is meaningless since it closes the entire
1145        // connection regardless of path.  So use PathId::ZERO.
1146        self.connection_reset_tokens
1147            .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1148            .cloned()
1149            .map(|ch| RouteDatagramTo::Connection(ch, PathId::ZERO))
1150    }
1151}
1152
1153#[derive(Debug)]
1154pub(crate) struct ConnectionMeta {
1155    init_cid: ConnectionId,
1156    /// Locally issues CIDs for each path
1157    loc_cids: FxHashMap<PathId, PathLocalCids>,
1158    /// Remote/local addresses the connection began with
1159    ///
1160    /// Only needed to support connections with zero-length CIDs, which cannot migrate, so we don't
1161    /// bother keeping it up to date.
1162    addresses: FourTuple,
1163    side: Side,
1164    /// Reset tokens provided by the peer for CIDs we're currently sending to
1165    ///
1166    /// Since each reset token is for a CID, it is also for a fixed remote address which is
1167    /// also stored. This allows us to look up which reset tokens we might expect from a
1168    /// given remote address, see [`ResetTokenTable`].
1169    ///
1170    /// Each path has its own active CID. We use the [`PathId`] as a unique index, allowing
1171    /// us to retire the reset token when a path is abandoned.
1172    reset_token: FxHashMap<PathId, (SocketAddr, ResetToken)>,
1173}
1174
1175/// Local connection IDs for a single path
1176#[derive(Debug, Default)]
1177struct PathLocalCids {
1178    /// Number of connection IDs that have been issued in (PATH_)NEW_CONNECTION_ID frames
1179    ///
1180    /// Another way of saying this is that this is the next sequence number to be issued.
1181    issued: u64,
1182    /// Issues CIDs indexed by their sequence number.
1183    cids: FxHashMap<u64, ConnectionId>,
1184}
1185
1186/// Internal identifier for a `Connection` currently associated with an endpoint
1187#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1188pub struct ConnectionHandle(pub usize);
1189
1190impl From<ConnectionHandle> for usize {
1191    fn from(x: ConnectionHandle) -> Self {
1192        x.0
1193    }
1194}
1195
1196impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1197    type Output = ConnectionMeta;
1198    fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1199        &self[ch.0]
1200    }
1201}
1202
1203impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1204    fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1205        &mut self[ch.0]
1206    }
1207}
1208
1209/// Event resulting from processing a single datagram
1210pub enum DatagramEvent {
1211    /// The datagram is redirected to its `Connection`
1212    ConnectionEvent(ConnectionHandle, ConnectionEvent),
1213    /// The datagram may result in starting a new `Connection`
1214    NewConnection(Incoming),
1215    /// Response generated directly by the endpoint
1216    Response(Transmit),
1217}
1218
1219/// An incoming connection for which the server has not yet begun its part of the handshake.
1220pub struct Incoming {
1221    received_at: Instant,
1222    addresses: FourTuple,
1223    ecn: Option<EcnCodepoint>,
1224    packet: InitialPacket,
1225    rest: Option<BytesMut>,
1226    crypto: Keys,
1227    token: IncomingToken,
1228    incoming_idx: usize,
1229    improper_drop_warner: IncomingImproperDropWarner,
1230}
1231
1232impl Incoming {
1233    /// The local IP address which was used when the peer established the connection
1234    ///
1235    /// This has the same behavior as [`Connection::local_ip`].
1236    pub fn local_ip(&self) -> Option<IpAddr> {
1237        self.addresses.local_ip
1238    }
1239
1240    /// The peer's UDP address
1241    pub fn remote_address(&self) -> SocketAddr {
1242        self.addresses.remote
1243    }
1244
1245    /// Whether the socket address that is initiating this connection has been validated
1246    ///
1247    /// This means that the sender of the initial packet has proved that they can receive traffic
1248    /// sent to `self.remote_address()`.
1249    ///
1250    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1251    /// The inverse is not guaranteed.
1252    pub fn remote_address_validated(&self) -> bool {
1253        self.token.validated
1254    }
1255
1256    /// Whether it is legal to respond with a retry packet
1257    ///
1258    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1259    /// The inverse is not guaranteed.
1260    pub fn may_retry(&self) -> bool {
1261        self.token.retry_src_cid.is_none()
1262    }
1263
1264    /// The original destination connection ID sent by the client
1265    pub fn orig_dst_cid(&self) -> ConnectionId {
1266        self.token.orig_dst_cid
1267    }
1268}
1269
1270impl fmt::Debug for Incoming {
1271    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1272        f.debug_struct("Incoming")
1273            .field("addresses", &self.addresses)
1274            .field("ecn", &self.ecn)
1275            // packet doesn't implement debug
1276            // rest is too big and not meaningful enough
1277            .field("token", &self.token)
1278            .field("incoming_idx", &self.incoming_idx)
1279            // improper drop warner contains no information
1280            .finish_non_exhaustive()
1281    }
1282}
1283
1284struct IncomingImproperDropWarner;
1285
1286impl IncomingImproperDropWarner {
1287    fn dismiss(self) {
1288        mem::forget(self);
1289    }
1290}
1291
1292impl Drop for IncomingImproperDropWarner {
1293    fn drop(&mut self) {
1294        warn!(
1295            "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1296               (may cause memory leak and eventual inability to accept new connections)"
1297        );
1298    }
1299}
1300
1301/// Errors in the parameters being used to create a new connection
1302///
1303/// These arise before any I/O has been performed.
1304#[derive(Debug, Error, Clone, PartialEq, Eq)]
1305pub enum ConnectError {
1306    /// The endpoint can no longer create new connections
1307    ///
1308    /// Indicates that a necessary component of the endpoint has been dropped or otherwise disabled.
1309    #[error("endpoint stopping")]
1310    EndpointStopping,
1311    /// The connection could not be created because not enough of the CID space is available
1312    ///
1313    /// Try using longer connection IDs
1314    #[error("CIDs exhausted")]
1315    CidsExhausted,
1316    /// The given server name was malformed
1317    #[error("invalid server name: {0}")]
1318    InvalidServerName(String),
1319    /// The remote [`SocketAddr`] supplied was malformed
1320    ///
1321    /// Examples include attempting to connect to port 0, or using an inappropriate address family.
1322    #[error("invalid remote address: {0}")]
1323    InvalidRemoteAddress(SocketAddr),
1324    /// No default client configuration was set up
1325    ///
1326    /// Use `Endpoint::connect_with` to specify a client configuration.
1327    #[error("no default client config")]
1328    NoDefaultClientConfig,
1329    /// The local endpoint does not support the QUIC version specified in the client configuration
1330    #[error("unsupported QUIC version")]
1331    UnsupportedVersion,
1332}
1333
1334/// Error type for attempting to accept an [`Incoming`]
1335#[derive(Debug)]
1336pub struct AcceptError {
1337    /// Underlying error describing reason for failure
1338    pub cause: ConnectionError,
1339    /// Optional response to transmit back
1340    pub response: Option<Transmit>,
1341}
1342
1343/// Error for attempting to retry an [`Incoming`] which already bears a token from a previous retry
1344#[derive(Debug, Error)]
1345#[error("retry() with validated Incoming")]
1346pub struct RetryError(Box<Incoming>);
1347
1348impl RetryError {
1349    /// Get the [`Incoming`]
1350    pub fn into_incoming(self) -> Incoming {
1351        *self.0
1352    }
1353}
1354
1355/// Reset Tokens which are associated with peer socket addresses
1356///
1357/// The standard `HashMap` is used since both `SocketAddr` and `ResetToken` are
1358/// peer generated and might be usable for hash collision attacks.
1359#[derive(Default, Debug)]
1360struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1361
1362impl ResetTokenTable {
1363    fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1364        self.0
1365            .entry(remote)
1366            .or_default()
1367            .insert(token, ch)
1368            .is_some()
1369    }
1370
1371    fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1372        use std::collections::hash_map::Entry;
1373        match self.0.entry(remote) {
1374            Entry::Vacant(_) => {}
1375            Entry::Occupied(mut e) => {
1376                e.get_mut().remove(&token);
1377                if e.get().is_empty() {
1378                    e.remove_entry();
1379                }
1380            }
1381        }
1382    }
1383
1384    fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1385        let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1386        self.0.get(&remote)?.get(&token)
1387    }
1388}
1389
1390/// Identifies a connection by the combination of remote and local addresses
1391///
1392/// Including the local ensures good behavior when the host has multiple IP addresses on the same
1393/// subnet and zero-length connection IDs are in use.
1394#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1395struct FourTuple {
1396    remote: SocketAddr,
1397    // A single socket can only listen on a single port, so no need to store it explicitly
1398    local_ip: Option<IpAddr>,
1399}