iroh_quinn_proto/
endpoint.rs

1use std::{
2    collections::{HashMap, hash_map},
3    convert::TryFrom,
4    fmt, mem,
5    net::{IpAddr, SocketAddr},
6    ops::{Index, IndexMut},
7    sync::Arc,
8};
9
10use bytes::{BufMut, Bytes, BytesMut};
11use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
12use rustc_hash::FxHashMap;
13use slab::Slab;
14use thiserror::Error;
15use tracing::{debug, error, trace, warn};
16
17use crate::{
18    Duration, FourTuple, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, PathId,
19    RESET_TOKEN_SIZE, ResetToken, Side, Transmit, TransportConfig, TransportError,
20    cid_generator::ConnectionIdGenerator,
21    coding::{BufMutExt, Encodable},
22    config::{ClientConfig, EndpointConfig, ServerConfig},
23    connection::{Connection, ConnectionError, SideArgs},
24    crypto::{self, Keys, UnsupportedVersion},
25    frame,
26    packet::{
27        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
28        PacketNumber, PartialDecode, ProtectedInitialHeader,
29    },
30    shared::{
31        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
32        EndpointEvent, EndpointEventInner, IssuedCid,
33    },
34    token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
35    transport_parameters::{PreferredAddress, TransportParameters},
36};
37
38/// The main entry point to the library
39///
40/// This object performs no I/O whatsoever. Instead, it consumes incoming packets and
41/// connection-generated events via `handle` and `handle_event`.
42pub struct Endpoint {
43    rng: StdRng,
44    index: ConnectionIndex,
45    connections: Slab<ConnectionMeta>,
46    local_cid_generator: Box<dyn ConnectionIdGenerator>,
47    config: Arc<EndpointConfig>,
48    server_config: Option<Arc<ServerConfig>>,
49    /// Whether the underlying UDP socket promises not to fragment packets
50    allow_mtud: bool,
51    /// Time at which a stateless reset was most recently sent
52    last_stateless_reset: Option<Instant>,
53    /// Buffered Initial and 0-RTT messages for pending incoming connections
54    incoming_buffers: Slab<IncomingBuffer>,
55    all_incoming_buffers_total_bytes: u64,
56}
57
58impl Endpoint {
59    /// Create a new endpoint
60    ///
61    /// `allow_mtud` enables path MTU detection when requested by `Connection` configuration for
62    /// better performance. This requires that outgoing packets are never fragmented, which can be
63    /// achieved via e.g. the `IPV6_DONTFRAG` socket option.
64    ///
65    /// If `rng_seed` is provided, it will be used to initialize the endpoint's rng (having priority
66    /// over the rng seed configured in [`EndpointConfig`]). Note that the `rng_seed` parameter will
67    /// be removed in a future release, so prefer setting it to `None` and configuring rng seeds
68    /// using [`EndpointConfig::rng_seed`].
69    pub fn new(
70        config: Arc<EndpointConfig>,
71        server_config: Option<Arc<ServerConfig>>,
72        allow_mtud: bool,
73        rng_seed: Option<[u8; 32]>,
74    ) -> Self {
75        let rng_seed = rng_seed.or(config.rng_seed);
76        Self {
77            rng: rng_seed.map_or(StdRng::from_os_rng(), StdRng::from_seed),
78            index: ConnectionIndex::default(),
79            connections: Slab::new(),
80            local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
81            config,
82            server_config,
83            allow_mtud,
84            last_stateless_reset: None,
85            incoming_buffers: Slab::new(),
86            all_incoming_buffers_total_bytes: 0,
87        }
88    }
89
90    /// Replace the server configuration, affecting new incoming connections only
91    pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
92        self.server_config = server_config;
93    }
94
95    /// Process `EndpointEvent`s emitted from related `Connection`s
96    ///
97    /// In turn, processing this event may return a `ConnectionEvent` for the same `Connection`.
98    pub fn handle_event(
99        &mut self,
100        ch: ConnectionHandle,
101        event: EndpointEvent,
102    ) -> Option<ConnectionEvent> {
103        use EndpointEventInner::*;
104        match event.0 {
105            NeedIdentifiers(path_id, now, n) => {
106                return Some(self.send_new_identifiers(path_id, now, ch, n));
107            }
108            ResetToken(path_id, remote, token) => {
109                if let Some(old) = self.connections[ch]
110                    .reset_token
111                    .insert(path_id, (remote, token))
112                {
113                    self.index.connection_reset_tokens.remove(old.0, old.1);
114                }
115                if self.index.connection_reset_tokens.insert(remote, token, ch) {
116                    warn!("duplicate reset token");
117                }
118            }
119            RetireResetToken(path_id) => {
120                if let Some(old) = self.connections[ch].reset_token.remove(&path_id) {
121                    self.index.connection_reset_tokens.remove(old.0, old.1);
122                }
123            }
124            RetireConnectionId(now, path_id, seq, allow_more_cids) => {
125                if let Some(cid) = self.connections[ch]
126                    .loc_cids
127                    .get_mut(&path_id)
128                    .and_then(|pcid| pcid.cids.remove(&seq))
129                {
130                    trace!(%path_id, "local CID retired {}: {}", seq, cid);
131                    self.index.retire(cid);
132                    if allow_more_cids {
133                        return Some(self.send_new_identifiers(path_id, now, ch, 1));
134                    }
135                }
136            }
137            Drained => {
138                if let Some(conn) = self.connections.try_remove(ch.0) {
139                    self.index.remove(&conn);
140                } else {
141                    // This indicates a bug in downstream code, which could cause spurious
142                    // connection loss instead of this error if the CID was (re)allocated prior to
143                    // the illegal call.
144                    error!(id = ch.0, "unknown connection drained");
145                }
146            }
147        }
148        None
149    }
150
151    /// Process an incoming UDP datagram
152    pub fn handle(
153        &mut self,
154        now: Instant,
155        network_path: FourTuple,
156        ecn: Option<EcnCodepoint>,
157        data: BytesMut,
158        buf: &mut Vec<u8>,
159    ) -> Option<DatagramEvent> {
160        // Partially decode packet or short-circuit if unable
161        let datagram_len = data.len();
162        let mut event = match PartialDecode::new(
163            data,
164            &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
165            &self.config.supported_versions,
166            self.config.grease_quic_bit,
167        ) {
168            Ok((first_decode, remaining)) => DatagramConnectionEvent {
169                now,
170                network_path,
171                path_id: PathId::ZERO, // Corrected later for existing paths
172                ecn,
173                first_decode,
174                remaining,
175            },
176            Err(PacketDecodeError::UnsupportedVersion {
177                src_cid,
178                dst_cid,
179                version,
180            }) => {
181                if self.server_config.is_none() {
182                    debug!("dropping packet with unsupported version");
183                    return None;
184                }
185                trace!("sending version negotiation");
186                // Negotiate versions
187                Header::VersionNegotiate {
188                    random: self.rng.random::<u8>() | 0x40,
189                    src_cid: dst_cid,
190                    dst_cid: src_cid,
191                }
192                .encode(buf);
193                // Grease with a reserved version
194                buf.write::<u32>(match version {
195                    0x0a1a_2a3a => 0x0a1a_2a4a,
196                    _ => 0x0a1a_2a3a,
197                });
198                for &version in &self.config.supported_versions {
199                    buf.write(version);
200                }
201                return Some(DatagramEvent::Response(Transmit {
202                    destination: network_path.remote,
203                    ecn: None,
204                    size: buf.len(),
205                    segment_size: None,
206                    src_ip: network_path.local_ip,
207                }));
208            }
209            Err(e) => {
210                trace!("malformed header: {}", e);
211                return None;
212            }
213        };
214
215        let dst_cid = event.first_decode.dst_cid();
216
217        if let Some(route_to) = self.index.get(&network_path, &event.first_decode) {
218            event.path_id = match route_to {
219                RouteDatagramTo::Incoming(_) => PathId::ZERO,
220                RouteDatagramTo::Connection(_, path_id) => path_id,
221            };
222            match route_to {
223                RouteDatagramTo::Incoming(incoming_idx) => {
224                    let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
225                    let config = &self.server_config.as_ref().unwrap();
226
227                    if incoming_buffer
228                        .total_bytes
229                        .checked_add(datagram_len as u64)
230                        .is_some_and(|n| n <= config.incoming_buffer_size)
231                        && self
232                            .all_incoming_buffers_total_bytes
233                            .checked_add(datagram_len as u64)
234                            .is_some_and(|n| n <= config.incoming_buffer_size_total)
235                    {
236                        incoming_buffer.datagrams.push(event);
237                        incoming_buffer.total_bytes += datagram_len as u64;
238                        self.all_incoming_buffers_total_bytes += datagram_len as u64;
239                    }
240
241                    None
242                }
243                RouteDatagramTo::Connection(ch, _path_id) => Some(DatagramEvent::ConnectionEvent(
244                    ch,
245                    ConnectionEvent(ConnectionEventInner::Datagram(event)),
246                )),
247            }
248        } else if event.first_decode.initial_header().is_some() {
249            // Potentially create a new connection
250
251            self.handle_first_packet(datagram_len, event, network_path, buf)
252        } else if event.first_decode.has_long_header() {
253            debug!(
254                "ignoring non-initial packet for unknown connection {}",
255                dst_cid
256            );
257            None
258        } else if !event.first_decode.is_initial()
259            && self.local_cid_generator.validate(dst_cid).is_err()
260        {
261            debug!("dropping packet with invalid CID");
262            None
263        } else if dst_cid.is_empty() {
264            trace!("dropping unrecognized short packet without ID");
265            None
266        } else {
267            // If we got this far, we're receiving a seemingly valid packet for an unknown
268            // connection. Send a stateless reset if possible.
269            self.stateless_reset(now, datagram_len, network_path, dst_cid, buf)
270                .map(DatagramEvent::Response)
271        }
272    }
273
274    /// Builds a stateless reset packet to respond with
275    fn stateless_reset(
276        &mut self,
277        now: Instant,
278        inciting_dgram_len: usize,
279        network_path: FourTuple,
280        dst_cid: ConnectionId,
281        buf: &mut Vec<u8>,
282    ) -> Option<Transmit> {
283        if self
284            .last_stateless_reset
285            .is_some_and(|last| last + self.config.min_reset_interval > now)
286        {
287            debug!("ignoring unexpected packet within minimum stateless reset interval");
288            return None;
289        }
290
291        /// Minimum amount of padding for the stateless reset to look like a short-header packet
292        const MIN_PADDING_LEN: usize = 5;
293
294        // Prevent amplification attacks and reset loops by ensuring we pad to at most 1 byte
295        // smaller than the inciting packet.
296        let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
297            Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
298            _ => {
299                debug!(
300                    "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
301                    inciting_dgram_len
302                );
303                return None;
304            }
305        };
306
307        debug!(
308            "sending stateless reset for {} to {}",
309            dst_cid, network_path.remote
310        );
311        self.last_stateless_reset = Some(now);
312        // Resets with at least this much padding can't possibly be distinguished from real packets
313        const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
314        let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
315            max_padding_len
316        } else {
317            self.rng
318                .random_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
319        };
320        buf.reserve(padding_len + RESET_TOKEN_SIZE);
321        buf.resize(padding_len, 0);
322        self.rng.fill_bytes(&mut buf[0..padding_len]);
323        buf[0] = 0b0100_0000 | (buf[0] >> 2);
324        buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
325
326        debug_assert!(buf.len() < inciting_dgram_len);
327
328        Some(Transmit {
329            destination: network_path.remote,
330            ecn: None,
331            size: buf.len(),
332            segment_size: None,
333            src_ip: network_path.local_ip,
334        })
335    }
336
337    /// Initiate a connection
338    pub fn connect(
339        &mut self,
340        now: Instant,
341        config: ClientConfig,
342        remote: SocketAddr,
343        server_name: &str,
344    ) -> Result<(ConnectionHandle, Connection), ConnectError> {
345        if self.cids_exhausted() {
346            return Err(ConnectError::CidsExhausted);
347        }
348        if remote.port() == 0 || remote.ip().is_unspecified() {
349            return Err(ConnectError::InvalidRemoteAddress(remote));
350        }
351        if !self.config.supported_versions.contains(&config.version) {
352            return Err(ConnectError::UnsupportedVersion);
353        }
354
355        let remote_id = (config.initial_dst_cid_provider)();
356        trace!(initial_dcid = %remote_id);
357
358        let ch = ConnectionHandle(self.connections.vacant_key());
359        let loc_cid = self.new_cid(ch, PathId::ZERO);
360        let params = TransportParameters::new(
361            &config.transport,
362            &self.config,
363            self.local_cid_generator.as_ref(),
364            loc_cid,
365            None,
366            &mut self.rng,
367        );
368        let tls = config
369            .crypto
370            .start_session(config.version, server_name, &params)?;
371
372        let conn = self.add_connection(
373            ch,
374            config.version,
375            remote_id,
376            loc_cid,
377            remote_id,
378            FourTuple {
379                remote,
380                local_ip: None,
381            },
382            now,
383            tls,
384            config.transport,
385            SideArgs::Client {
386                token_store: config.token_store,
387                server_name: server_name.into(),
388            },
389            &params,
390        );
391        Ok((ch, conn))
392    }
393
394    /// Generates new CIDs and creates message to send to the connection state
395    fn send_new_identifiers(
396        &mut self,
397        path_id: PathId,
398        now: Instant,
399        ch: ConnectionHandle,
400        num: u64,
401    ) -> ConnectionEvent {
402        let mut ids = vec![];
403        for _ in 0..num {
404            let id = self.new_cid(ch, path_id);
405            let cid_meta = self.connections[ch].loc_cids.entry(path_id).or_default();
406            let sequence = cid_meta.issued;
407            cid_meta.issued += 1;
408            cid_meta.cids.insert(sequence, id);
409            ids.push(IssuedCid {
410                path_id,
411                sequence,
412                id,
413                reset_token: ResetToken::new(&*self.config.reset_key, id),
414            });
415        }
416        ConnectionEvent(ConnectionEventInner::NewIdentifiers(
417            ids,
418            now,
419            self.local_cid_generator.cid_len(),
420            self.local_cid_generator.cid_lifetime(),
421        ))
422    }
423
424    /// Generate a connection ID for `ch`
425    fn new_cid(&mut self, ch: ConnectionHandle, path_id: PathId) -> ConnectionId {
426        loop {
427            let cid = self.local_cid_generator.generate_cid();
428            if cid.is_empty() {
429                // Zero-length CID; nothing to track
430                debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
431                return cid;
432            }
433            if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
434                e.insert((ch, path_id));
435                break cid;
436            }
437        }
438    }
439
440    fn handle_first_packet(
441        &mut self,
442        datagram_len: usize,
443        event: DatagramConnectionEvent,
444        network_path: FourTuple,
445        buf: &mut Vec<u8>,
446    ) -> Option<DatagramEvent> {
447        let dst_cid = event.first_decode.dst_cid();
448        let header = event.first_decode.initial_header().unwrap();
449
450        let Some(server_config) = &self.server_config else {
451            debug!("packet for unrecognized connection {}", dst_cid);
452            return self
453                .stateless_reset(event.now, datagram_len, network_path, dst_cid, buf)
454                .map(DatagramEvent::Response);
455        };
456
457        if datagram_len < MIN_INITIAL_SIZE as usize {
458            debug!("ignoring short initial for connection {}", dst_cid);
459            return None;
460        }
461
462        let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
463            Ok(keys) => keys,
464            Err(UnsupportedVersion) => {
465                // This probably indicates that the user set supported_versions incorrectly in
466                // `EndpointConfig`.
467                debug!(
468                    "ignoring initial packet version {:#x} unsupported by cryptographic layer",
469                    header.version
470                );
471                return None;
472            }
473        };
474
475        if let Err(reason) = self.early_validate_first_packet(header) {
476            return Some(DatagramEvent::Response(self.initial_close(
477                header.version,
478                network_path,
479                &crypto,
480                header.src_cid,
481                reason,
482                buf,
483            )));
484        }
485
486        let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
487            Ok(packet) => packet,
488            Err(e) => {
489                trace!("unable to decode initial packet: {}", e);
490                return None;
491            }
492        };
493
494        if !packet.reserved_bits_valid() {
495            debug!("dropping connection attempt with invalid reserved bits");
496            return None;
497        }
498
499        let Header::Initial(header) = packet.header else {
500            panic!("non-initial packet in handle_first_packet()");
501        };
502
503        let server_config = self.server_config.as_ref().unwrap().clone();
504
505        let token = match IncomingToken::from_header(&header, &server_config, network_path.remote) {
506            Ok(token) => token,
507            Err(InvalidRetryTokenError) => {
508                debug!("rejecting invalid retry token");
509                return Some(DatagramEvent::Response(self.initial_close(
510                    header.version,
511                    network_path,
512                    &crypto,
513                    header.src_cid,
514                    TransportError::INVALID_TOKEN(""),
515                    buf,
516                )));
517            }
518        };
519
520        let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
521        self.index
522            .insert_initial_incoming(header.dst_cid, incoming_idx);
523
524        Some(DatagramEvent::NewConnection(Incoming {
525            received_at: event.now,
526            network_path,
527            ecn: event.ecn,
528            packet: InitialPacket {
529                header,
530                header_data: packet.header_data,
531                payload: packet.payload,
532            },
533            rest: event.remaining,
534            crypto,
535            token,
536            incoming_idx,
537            improper_drop_warner: IncomingImproperDropWarner,
538        }))
539    }
540
541    /// Attempt to accept this incoming connection (an error may still occur)
542    // box err to avoid clippy::result_large_err
543    pub fn accept(
544        &mut self,
545        mut incoming: Incoming,
546        now: Instant,
547        buf: &mut Vec<u8>,
548        server_config: Option<Arc<ServerConfig>>,
549    ) -> Result<(ConnectionHandle, Connection), Box<AcceptError>> {
550        let remote_address_validated = incoming.remote_address_validated();
551        incoming.improper_drop_warner.dismiss();
552        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
553        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
554
555        let packet_number = incoming.packet.header.number.expand(0);
556        let InitialHeader {
557            src_cid,
558            dst_cid,
559            version,
560            ..
561        } = incoming.packet.header;
562        let server_config =
563            server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
564
565        if server_config
566            .transport
567            .max_idle_timeout
568            .is_some_and(|timeout| {
569                incoming.received_at + Duration::from_millis(timeout.into()) <= now
570            })
571        {
572            debug!("abandoning accept of stale initial");
573            self.index.remove_initial(dst_cid);
574            return Err(Box::new(AcceptError {
575                cause: ConnectionError::TimedOut,
576                response: None,
577            }));
578        }
579
580        if self.cids_exhausted() {
581            debug!("refusing connection");
582            self.index.remove_initial(dst_cid);
583            return Err(Box::new(AcceptError {
584                cause: ConnectionError::CidsExhausted,
585                response: Some(self.initial_close(
586                    version,
587                    incoming.network_path,
588                    &incoming.crypto,
589                    src_cid,
590                    TransportError::CONNECTION_REFUSED(""),
591                    buf,
592                )),
593            }));
594        }
595
596        if incoming
597            .crypto
598            .packet
599            .remote
600            .decrypt(
601                PathId::ZERO,
602                packet_number,
603                &incoming.packet.header_data,
604                &mut incoming.packet.payload,
605            )
606            .is_err()
607        {
608            debug!(packet_number, "failed to authenticate initial packet");
609            self.index.remove_initial(dst_cid);
610            return Err(Box::new(AcceptError {
611                cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
612                response: None,
613            }));
614        };
615
616        let ch = ConnectionHandle(self.connections.vacant_key());
617        let loc_cid = self.new_cid(ch, PathId::ZERO);
618        let mut params = TransportParameters::new(
619            &server_config.transport,
620            &self.config,
621            self.local_cid_generator.as_ref(),
622            loc_cid,
623            Some(&server_config),
624            &mut self.rng,
625        );
626        params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
627        params.original_dst_cid = Some(incoming.token.orig_dst_cid);
628        params.retry_src_cid = incoming.token.retry_src_cid;
629        let mut pref_addr_cid = None;
630        if server_config.has_preferred_address() {
631            let cid = self.new_cid(ch, PathId::ZERO);
632            pref_addr_cid = Some(cid);
633            params.preferred_address = Some(PreferredAddress {
634                address_v4: server_config.preferred_address_v4,
635                address_v6: server_config.preferred_address_v6,
636                connection_id: cid,
637                stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
638            });
639        }
640
641        let tls = server_config.crypto.clone().start_session(version, &params);
642        let transport_config = server_config.transport.clone();
643        let mut conn = self.add_connection(
644            ch,
645            version,
646            dst_cid,
647            loc_cid,
648            src_cid,
649            incoming.network_path,
650            incoming.received_at,
651            tls,
652            transport_config,
653            SideArgs::Server {
654                server_config,
655                pref_addr_cid,
656                path_validated: remote_address_validated,
657            },
658            &params,
659        );
660        self.index.insert_initial(dst_cid, ch);
661
662        match conn.handle_first_packet(
663            incoming.received_at,
664            incoming.network_path,
665            incoming.ecn,
666            packet_number,
667            incoming.packet,
668            incoming.rest,
669        ) {
670            Ok(()) => {
671                trace!(id = ch.0, icid = %dst_cid, "new connection");
672
673                for event in incoming_buffer.datagrams {
674                    conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
675                }
676
677                Ok((ch, conn))
678            }
679            Err(e) => {
680                debug!("handshake failed: {}", e);
681                self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
682                let response = match e {
683                    ConnectionError::TransportError(ref e) => Some(self.initial_close(
684                        version,
685                        incoming.network_path,
686                        &incoming.crypto,
687                        src_cid,
688                        e.clone(),
689                        buf,
690                    )),
691                    _ => None,
692                };
693                Err(Box::new(AcceptError { cause: e, response }))
694            }
695        }
696    }
697
698    /// Check if we should refuse a connection attempt regardless of the packet's contents
699    fn early_validate_first_packet(
700        &mut self,
701        header: &ProtectedInitialHeader,
702    ) -> Result<(), TransportError> {
703        let config = &self.server_config.as_ref().unwrap();
704        if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
705            return Err(TransportError::CONNECTION_REFUSED(""));
706        }
707
708        // RFC9000 §7.2 dictates that initial (client-chosen) destination CIDs must be at least 8
709        // bytes. If this is a Retry packet, then the length must instead match our usual CID
710        // length. If we ever issue non-Retry address validation tokens via `NEW_TOKEN`, then we'll
711        // also need to validate CID length for those after decoding the token.
712        if header.dst_cid.len() < 8
713            && (header.token_pos.is_empty()
714                || header.dst_cid.len() != self.local_cid_generator.cid_len())
715        {
716            debug!(
717                "rejecting connection due to invalid DCID length {}",
718                header.dst_cid.len()
719            );
720            return Err(TransportError::PROTOCOL_VIOLATION(
721                "invalid destination CID length",
722            ));
723        }
724
725        Ok(())
726    }
727
728    /// Reject this incoming connection attempt
729    pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
730        self.clean_up_incoming(&incoming);
731        incoming.improper_drop_warner.dismiss();
732
733        self.initial_close(
734            incoming.packet.header.version,
735            incoming.network_path,
736            &incoming.crypto,
737            incoming.packet.header.src_cid,
738            TransportError::CONNECTION_REFUSED(""),
739            buf,
740        )
741    }
742
743    /// Respond with a retry packet, requiring the client to retry with address validation
744    ///
745    /// Errors if `incoming.may_retry()` is false.
746    pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
747        if !incoming.may_retry() {
748            return Err(RetryError(Box::new(incoming)));
749        }
750
751        self.clean_up_incoming(&incoming);
752        incoming.improper_drop_warner.dismiss();
753
754        let server_config = self.server_config.as_ref().unwrap();
755
756        // First Initial
757        // The peer will use this as the DCID of its following Initials. Initial DCIDs are
758        // looked up separately from Handshake/Data DCIDs, so there is no risk of collision
759        // with established connections. In the unlikely event that a collision occurs
760        // between two connections in the initial phase, both will fail fast and may be
761        // retried by the application layer.
762        let loc_cid = self.local_cid_generator.generate_cid();
763
764        let payload = TokenPayload::Retry {
765            address: incoming.network_path.remote,
766            orig_dst_cid: incoming.packet.header.dst_cid,
767            issued: server_config.time_source.now(),
768        };
769        let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
770
771        let header = Header::Retry {
772            src_cid: loc_cid,
773            dst_cid: incoming.packet.header.src_cid,
774            version: incoming.packet.header.version,
775        };
776
777        let encode = header.encode(buf);
778        buf.put_slice(&token);
779        buf.extend_from_slice(&server_config.crypto.retry_tag(
780            incoming.packet.header.version,
781            incoming.packet.header.dst_cid,
782            buf,
783        ));
784        encode.finish(buf, &*incoming.crypto.header.local, None);
785
786        Ok(Transmit {
787            destination: incoming.network_path.remote,
788            ecn: None,
789            size: buf.len(),
790            segment_size: None,
791            src_ip: incoming.network_path.local_ip,
792        })
793    }
794
795    /// Ignore this incoming connection attempt, not sending any packet in response
796    ///
797    /// Doing this actively, rather than merely dropping the [`Incoming`], is necessary to prevent
798    /// memory leaks due to state within [`Endpoint`] tracking the incoming connection.
799    pub fn ignore(&mut self, incoming: Incoming) {
800        self.clean_up_incoming(&incoming);
801        incoming.improper_drop_warner.dismiss();
802    }
803
804    /// Clean up endpoint data structures associated with an `Incoming`.
805    fn clean_up_incoming(&mut self, incoming: &Incoming) {
806        self.index.remove_initial(incoming.packet.header.dst_cid);
807        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
808        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
809    }
810
811    fn add_connection(
812        &mut self,
813        ch: ConnectionHandle,
814        version: u32,
815        init_cid: ConnectionId,
816        loc_cid: ConnectionId,
817        rem_cid: ConnectionId,
818        network_path: FourTuple,
819        now: Instant,
820        tls: Box<dyn crypto::Session>,
821        transport_config: Arc<TransportConfig>,
822        side_args: SideArgs,
823        // Only used for qlog.
824        params: &TransportParameters,
825    ) -> Connection {
826        let mut rng_seed = [0; 32];
827        self.rng.fill_bytes(&mut rng_seed);
828        let side = side_args.side();
829        let pref_addr_cid = side_args.pref_addr_cid();
830
831        let qlog =
832            transport_config.create_qlog_sink(side_args.side(), network_path.remote, init_cid, now);
833
834        qlog.emit_connection_started(
835            now,
836            loc_cid,
837            rem_cid,
838            network_path.remote,
839            network_path.local_ip,
840            params,
841        );
842
843        let conn = Connection::new(
844            self.config.clone(),
845            transport_config,
846            init_cid,
847            loc_cid,
848            rem_cid,
849            network_path,
850            tls,
851            self.local_cid_generator.as_ref(),
852            now,
853            version,
854            self.allow_mtud,
855            rng_seed,
856            side_args,
857            qlog,
858        );
859
860        let mut path_cids = PathLocalCids::default();
861        path_cids.cids.insert(path_cids.issued, loc_cid);
862        path_cids.issued += 1;
863
864        if let Some(cid) = pref_addr_cid {
865            debug_assert_eq!(path_cids.issued, 1, "preferred address cid seq must be 1");
866            path_cids.cids.insert(path_cids.issued, cid);
867            path_cids.issued += 1;
868        }
869
870        let id = self.connections.insert(ConnectionMeta {
871            init_cid,
872            loc_cids: FxHashMap::from_iter([(PathId::ZERO, path_cids)]),
873            network_path,
874            side,
875            reset_token: Default::default(),
876        });
877        debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
878
879        self.index.insert_conn(network_path, loc_cid, ch, side);
880
881        conn
882    }
883
884    fn initial_close(
885        &mut self,
886        version: u32,
887        network_path: FourTuple,
888        crypto: &Keys,
889        remote_id: ConnectionId,
890        reason: TransportError,
891        buf: &mut Vec<u8>,
892    ) -> Transmit {
893        // We don't need to worry about CID collisions in initial closes because the peer
894        // shouldn't respond, and if it does, and the CID collides, we'll just drop the
895        // unexpected response.
896        let local_id = self.local_cid_generator.generate_cid();
897        let number = PacketNumber::U8(0);
898        let header = Header::Initial(InitialHeader {
899            dst_cid: remote_id,
900            src_cid: local_id,
901            number,
902            token: Bytes::new(),
903            version,
904        });
905
906        let partial_encode = header.encode(buf);
907        let max_len =
908            INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
909        frame::Close::from(reason).encoder(max_len).encode(buf);
910        buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
911        partial_encode.finish(
912            buf,
913            &*crypto.header.local,
914            Some((0, Default::default(), &*crypto.packet.local)),
915        );
916        Transmit {
917            destination: network_path.remote,
918            ecn: None,
919            size: buf.len(),
920            segment_size: None,
921            src_ip: network_path.local_ip,
922        }
923    }
924
925    /// Access the configuration used by this endpoint
926    pub fn config(&self) -> &EndpointConfig {
927        &self.config
928    }
929
930    /// Number of connections that are currently open
931    pub fn open_connections(&self) -> usize {
932        self.connections.len()
933    }
934
935    /// Counter for the number of bytes currently used
936    /// in the buffers for Initial and 0-RTT messages for pending incoming connections
937    pub fn incoming_buffer_bytes(&self) -> u64 {
938        self.all_incoming_buffers_total_bytes
939    }
940
941    #[cfg(test)]
942    pub(crate) fn known_connections(&self) -> usize {
943        let x = self.connections.len();
944        debug_assert_eq!(x, self.index.connection_ids_initial.len());
945        // Not all connections have known reset tokens
946        debug_assert!(x >= self.index.connection_reset_tokens.0.len());
947        // Not all connections have unique remotes, and 0-length CIDs might not be in use.
948        debug_assert!(x >= self.index.incoming_connection_remotes.len());
949        debug_assert!(x >= self.index.outgoing_connection_remotes.len());
950        x
951    }
952
953    #[cfg(test)]
954    pub(crate) fn known_cids(&self) -> usize {
955        self.index.connection_ids.len()
956    }
957
958    /// Whether we've used up 3/4 of the available CID space
959    ///
960    /// We leave some space unused so that `new_cid` can be relied upon to finish quickly. We don't
961    /// bother to check when CID longer than 4 bytes are used because 2^40 connections is a lot.
962    fn cids_exhausted(&self) -> bool {
963        self.local_cid_generator.cid_len() <= 4
964            && self.local_cid_generator.cid_len() != 0
965            && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
966                - self.index.connection_ids.len())
967                < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
968    }
969}
970
971impl fmt::Debug for Endpoint {
972    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
973        fmt.debug_struct("Endpoint")
974            .field("rng", &self.rng)
975            .field("index", &self.index)
976            .field("connections", &self.connections)
977            .field("config", &self.config)
978            .field("server_config", &self.server_config)
979            // incoming_buffers too large
980            .field("incoming_buffers.len", &self.incoming_buffers.len())
981            .field(
982                "all_incoming_buffers_total_bytes",
983                &self.all_incoming_buffers_total_bytes,
984            )
985            .finish()
986    }
987}
988
989/// Buffered Initial and 0-RTT messages for a pending incoming connection
990#[derive(Default)]
991struct IncomingBuffer {
992    datagrams: Vec<DatagramConnectionEvent>,
993    total_bytes: u64,
994}
995
996/// Part of protocol state incoming datagrams can be routed to
997#[derive(Copy, Clone, Debug)]
998enum RouteDatagramTo {
999    Incoming(usize),
1000    Connection(ConnectionHandle, PathId),
1001}
1002
1003/// Maps packets to existing connections
1004#[derive(Default, Debug)]
1005struct ConnectionIndex {
1006    /// Identifies connections based on the initial DCID the peer utilized
1007    ///
1008    /// Uses a standard `HashMap` to protect against hash collision attacks.
1009    ///
1010    /// Used by the server, not the client.
1011    connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1012    /// Identifies connections based on locally created CIDs
1013    ///
1014    /// Uses a cheaper hash function since keys are locally created
1015    connection_ids: FxHashMap<ConnectionId, (ConnectionHandle, PathId)>,
1016    /// Identifies incoming connections with zero-length CIDs
1017    ///
1018    /// Uses a standard `HashMap` to protect against hash collision attacks.
1019    incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1020    /// Identifies outgoing connections with zero-length CIDs
1021    ///
1022    /// We don't yet support explicit source addresses for client connections, and zero-length CIDs
1023    /// require a unique 4-tuple, so at most one client connection with zero-length local CIDs
1024    /// may be established per remote. We must omit the local address from the key because we don't
1025    /// necessarily know what address we're sending from, and hence receiving at.
1026    ///
1027    /// Uses a standard `HashMap` to protect against hash collision attacks.
1028    // TODO(matheus23): It's possible this could be changed now that we track the full 4-tuple on the client side, too.
1029    outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1030    /// Reset tokens provided by the peer for the CID each connection is currently sending to
1031    ///
1032    /// Incoming stateless resets do not have correct CIDs, so we need this to identify the correct
1033    /// recipient, if any.
1034    connection_reset_tokens: ResetTokenTable,
1035}
1036
1037impl ConnectionIndex {
1038    /// Associate an incoming connection with its initial destination CID
1039    fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1040        if dst_cid.is_empty() {
1041            return;
1042        }
1043        self.connection_ids_initial
1044            .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1045    }
1046
1047    /// Remove an association with an initial destination CID
1048    fn remove_initial(&mut self, dst_cid: ConnectionId) {
1049        if dst_cid.is_empty() {
1050            return;
1051        }
1052        let removed = self.connection_ids_initial.remove(&dst_cid);
1053        debug_assert!(removed.is_some());
1054    }
1055
1056    /// Associate a connection with its initial destination CID
1057    fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1058        if dst_cid.is_empty() {
1059            return;
1060        }
1061        self.connection_ids_initial.insert(
1062            dst_cid,
1063            RouteDatagramTo::Connection(connection, PathId::ZERO),
1064        );
1065    }
1066
1067    /// Associate a connection with its first locally-chosen destination CID if used, or otherwise
1068    /// its current 4-tuple
1069    fn insert_conn(
1070        &mut self,
1071        network_path: FourTuple,
1072        dst_cid: ConnectionId,
1073        connection: ConnectionHandle,
1074        side: Side,
1075    ) {
1076        match dst_cid.len() {
1077            0 => match side {
1078                Side::Server => {
1079                    self.incoming_connection_remotes
1080                        .insert(network_path, connection);
1081                }
1082                Side::Client => {
1083                    self.outgoing_connection_remotes
1084                        .insert(network_path.remote, connection);
1085                }
1086            },
1087            _ => {
1088                self.connection_ids
1089                    .insert(dst_cid, (connection, PathId::ZERO));
1090            }
1091        }
1092    }
1093
1094    /// Discard a connection ID
1095    fn retire(&mut self, dst_cid: ConnectionId) {
1096        self.connection_ids.remove(&dst_cid);
1097    }
1098
1099    /// Remove all references to a connection
1100    fn remove(&mut self, conn: &ConnectionMeta) {
1101        if conn.side.is_server() {
1102            self.remove_initial(conn.init_cid);
1103        }
1104        for cid in conn.loc_cids.values().flat_map(|pcids| pcids.cids.values()) {
1105            self.connection_ids.remove(cid);
1106        }
1107        self.incoming_connection_remotes.remove(&conn.network_path);
1108        self.outgoing_connection_remotes
1109            .remove(&conn.network_path.remote);
1110        for (remote, token) in conn.reset_token.values() {
1111            self.connection_reset_tokens.remove(*remote, *token);
1112        }
1113    }
1114
1115    /// Find the existing connection that `datagram` should be routed to, if any
1116    fn get(&self, network_path: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1117        if !datagram.dst_cid().is_empty() {
1118            if let Some(&(ch, path_id)) = self.connection_ids.get(&datagram.dst_cid()) {
1119                return Some(RouteDatagramTo::Connection(ch, path_id));
1120            }
1121        }
1122        if datagram.is_initial() || datagram.is_0rtt() {
1123            if let Some(&ch) = self.connection_ids_initial.get(&datagram.dst_cid()) {
1124                return Some(ch);
1125            }
1126        }
1127        if datagram.dst_cid().is_empty() {
1128            if let Some(&ch) = self.incoming_connection_remotes.get(network_path) {
1129                // Never multipath because QUIC-MULTIPATH 1.1 mandates the use of non-zero
1130                // length CIDs.  So this is always PathId::ZERO.
1131                return Some(RouteDatagramTo::Connection(ch, PathId::ZERO));
1132            }
1133            if let Some(&ch) = self.outgoing_connection_remotes.get(&network_path.remote) {
1134                // Like above, QUIC-MULTIPATH 1.1 mandates the use of non-zero length CIDs.
1135                return Some(RouteDatagramTo::Connection(ch, PathId::ZERO));
1136            }
1137        }
1138        let data = datagram.data();
1139        if data.len() < RESET_TOKEN_SIZE {
1140            return None;
1141        }
1142        // For stateless resets the PathId is meaningless since it closes the entire
1143        // connection regardless of path.  So use PathId::ZERO.
1144        self.connection_reset_tokens
1145            .get(network_path.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1146            .cloned()
1147            .map(|ch| RouteDatagramTo::Connection(ch, PathId::ZERO))
1148    }
1149}
1150
1151#[derive(Debug)]
1152pub(crate) struct ConnectionMeta {
1153    init_cid: ConnectionId,
1154    /// Locally issues CIDs for each path
1155    loc_cids: FxHashMap<PathId, PathLocalCids>,
1156    /// Remote/local addresses the connection began with
1157    ///
1158    /// Only needed to support connections with zero-length CIDs, which cannot migrate, so we don't
1159    /// bother keeping it up to date.
1160    network_path: FourTuple,
1161    side: Side,
1162    /// Reset tokens provided by the peer for CIDs we're currently sending to
1163    ///
1164    /// Since each reset token is for a CID, it is also for a fixed remote address which is
1165    /// also stored. This allows us to look up which reset tokens we might expect from a
1166    /// given remote address, see [`ResetTokenTable`].
1167    ///
1168    /// Each path has its own active CID. We use the [`PathId`] as a unique index, allowing
1169    /// us to retire the reset token when a path is abandoned.
1170    reset_token: FxHashMap<PathId, (SocketAddr, ResetToken)>,
1171}
1172
1173/// Local connection IDs for a single path
1174#[derive(Debug, Default)]
1175struct PathLocalCids {
1176    /// Number of connection IDs that have been issued in (PATH_)NEW_CONNECTION_ID frames
1177    ///
1178    /// Another way of saying this is that this is the next sequence number to be issued.
1179    issued: u64,
1180    /// Issues CIDs indexed by their sequence number.
1181    cids: FxHashMap<u64, ConnectionId>,
1182}
1183
1184/// Internal identifier for a `Connection` currently associated with an endpoint
1185#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1186pub struct ConnectionHandle(pub usize);
1187
1188impl From<ConnectionHandle> for usize {
1189    fn from(x: ConnectionHandle) -> Self {
1190        x.0
1191    }
1192}
1193
1194impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1195    type Output = ConnectionMeta;
1196    fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1197        &self[ch.0]
1198    }
1199}
1200
1201impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1202    fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1203        &mut self[ch.0]
1204    }
1205}
1206
1207/// Event resulting from processing a single datagram
1208pub enum DatagramEvent {
1209    /// The datagram is redirected to its `Connection`
1210    ConnectionEvent(ConnectionHandle, ConnectionEvent),
1211    /// The datagram may result in starting a new `Connection`
1212    NewConnection(Incoming),
1213    /// Response generated directly by the endpoint
1214    Response(Transmit),
1215}
1216
1217/// An incoming connection for which the server has not yet begun its part of the handshake.
1218pub struct Incoming {
1219    received_at: Instant,
1220    network_path: FourTuple,
1221    ecn: Option<EcnCodepoint>,
1222    packet: InitialPacket,
1223    rest: Option<BytesMut>,
1224    crypto: Keys,
1225    token: IncomingToken,
1226    incoming_idx: usize,
1227    improper_drop_warner: IncomingImproperDropWarner,
1228}
1229
1230impl Incoming {
1231    /// The local IP address which was used when the peer established the connection
1232    ///
1233    /// This has the same behavior as [`Connection::local_ip`].
1234    pub fn local_ip(&self) -> Option<IpAddr> {
1235        self.network_path.local_ip
1236    }
1237
1238    /// The peer's UDP address
1239    pub fn remote_address(&self) -> SocketAddr {
1240        self.network_path.remote
1241    }
1242
1243    /// Whether the socket address that is initiating this connection has been validated
1244    ///
1245    /// This means that the sender of the initial packet has proved that they can receive traffic
1246    /// sent to `self.remote_address()`.
1247    ///
1248    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1249    /// The inverse is not guaranteed.
1250    pub fn remote_address_validated(&self) -> bool {
1251        self.token.validated
1252    }
1253
1254    /// Whether it is legal to respond with a retry packet
1255    ///
1256    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1257    /// The inverse is not guaranteed.
1258    pub fn may_retry(&self) -> bool {
1259        self.token.retry_src_cid.is_none()
1260    }
1261
1262    /// The original destination connection ID sent by the client
1263    pub fn orig_dst_cid(&self) -> ConnectionId {
1264        self.token.orig_dst_cid
1265    }
1266}
1267
1268impl fmt::Debug for Incoming {
1269    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1270        f.debug_struct("Incoming")
1271            .field("network_path", &self.network_path)
1272            .field("ecn", &self.ecn)
1273            // packet doesn't implement debug
1274            // rest is too big and not meaningful enough
1275            .field("token", &self.token)
1276            .field("incoming_idx", &self.incoming_idx)
1277            // improper drop warner contains no information
1278            .finish_non_exhaustive()
1279    }
1280}
1281
1282struct IncomingImproperDropWarner;
1283
1284impl IncomingImproperDropWarner {
1285    fn dismiss(self) {
1286        mem::forget(self);
1287    }
1288}
1289
1290impl Drop for IncomingImproperDropWarner {
1291    fn drop(&mut self) {
1292        warn!(
1293            "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1294               (may cause memory leak and eventual inability to accept new connections)"
1295        );
1296    }
1297}
1298
1299/// Errors in the parameters being used to create a new connection
1300///
1301/// These arise before any I/O has been performed.
1302#[derive(Debug, Error, Clone, PartialEq, Eq)]
1303pub enum ConnectError {
1304    /// The endpoint can no longer create new connections
1305    ///
1306    /// Indicates that a necessary component of the endpoint has been dropped or otherwise disabled.
1307    #[error("endpoint stopping")]
1308    EndpointStopping,
1309    /// The connection could not be created because not enough of the CID space is available
1310    ///
1311    /// Try using longer connection IDs
1312    #[error("CIDs exhausted")]
1313    CidsExhausted,
1314    /// The given server name was malformed
1315    #[error("invalid server name: {0}")]
1316    InvalidServerName(String),
1317    /// The remote [`SocketAddr`] supplied was malformed
1318    ///
1319    /// Examples include attempting to connect to port 0, or using an inappropriate address family.
1320    #[error("invalid remote address: {0}")]
1321    InvalidRemoteAddress(SocketAddr),
1322    /// No default client configuration was set up
1323    ///
1324    /// Use `Endpoint::connect_with` to specify a client configuration.
1325    #[error("no default client config")]
1326    NoDefaultClientConfig,
1327    /// The local endpoint does not support the QUIC version specified in the client configuration
1328    #[error("unsupported QUIC version")]
1329    UnsupportedVersion,
1330}
1331
1332/// Error type for attempting to accept an [`Incoming`]
1333#[derive(Debug)]
1334pub struct AcceptError {
1335    /// Underlying error describing reason for failure
1336    pub cause: ConnectionError,
1337    /// Optional response to transmit back
1338    pub response: Option<Transmit>,
1339}
1340
1341/// Error for attempting to retry an [`Incoming`] which already bears a token from a previous retry
1342#[derive(Debug, Error)]
1343#[error("retry() with validated Incoming")]
1344pub struct RetryError(Box<Incoming>);
1345
1346impl RetryError {
1347    /// Get the [`Incoming`]
1348    pub fn into_incoming(self) -> Incoming {
1349        *self.0
1350    }
1351}
1352
1353/// Reset Tokens which are associated with peer socket addresses
1354///
1355/// The standard `HashMap` is used since both `SocketAddr` and `ResetToken` are
1356/// peer generated and might be usable for hash collision attacks.
1357#[derive(Default, Debug)]
1358struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1359
1360impl ResetTokenTable {
1361    fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1362        self.0
1363            .entry(remote)
1364            .or_default()
1365            .insert(token, ch)
1366            .is_some()
1367    }
1368
1369    fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1370        use std::collections::hash_map::Entry;
1371        match self.0.entry(remote) {
1372            Entry::Vacant(_) => {}
1373            Entry::Occupied(mut e) => {
1374                e.get_mut().remove(&token);
1375                if e.get().is_empty() {
1376                    e.remove_entry();
1377                }
1378            }
1379        }
1380    }
1381
1382    fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1383        let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1384        self.0.get(&remote)?.get(&token)
1385    }
1386}