iroh_quinn_proto/
endpoint.rs

1use std::{
2    collections::{HashMap, hash_map},
3    convert::TryFrom,
4    fmt, mem,
5    net::{IpAddr, SocketAddr},
6    ops::{Index, IndexMut},
7    sync::Arc,
8};
9
10use bytes::{BufMut, Bytes, BytesMut};
11use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
12use rustc_hash::FxHashMap;
13use slab::Slab;
14use thiserror::Error;
15use tracing::{debug, error, trace, warn};
16
17use crate::{
18    Duration, FourTuple, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, PathId,
19    RESET_TOKEN_SIZE, ResetToken, Side, Transmit, TransportConfig, TransportError,
20    cid_generator::ConnectionIdGenerator,
21    coding::{BufMutExt, Encodable},
22    config::{ClientConfig, EndpointConfig, ServerConfig},
23    connection::{Connection, ConnectionError, SideArgs},
24    crypto::{self, Keys, UnsupportedVersion},
25    frame,
26    packet::{
27        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
28        PacketNumber, PartialDecode, ProtectedInitialHeader,
29    },
30    shared::{
31        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
32        EndpointEvent, EndpointEventInner, IssuedCid,
33    },
34    token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
35    transport_parameters::{PreferredAddress, TransportParameters},
36};
37
38/// The main entry point to the library
39///
40/// This object performs no I/O whatsoever. Instead, it consumes incoming packets and
41/// connection-generated events via `handle` and `handle_event`.
42pub struct Endpoint {
43    rng: StdRng,
44    index: ConnectionIndex,
45    connections: Slab<ConnectionMeta>,
46    local_cid_generator: Box<dyn ConnectionIdGenerator>,
47    config: Arc<EndpointConfig>,
48    server_config: Option<Arc<ServerConfig>>,
49    /// Whether the underlying UDP socket promises not to fragment packets
50    allow_mtud: bool,
51    /// Time at which a stateless reset was most recently sent
52    last_stateless_reset: Option<Instant>,
53    /// Buffered Initial and 0-RTT messages for pending incoming connections
54    incoming_buffers: Slab<IncomingBuffer>,
55    all_incoming_buffers_total_bytes: u64,
56}
57
58impl Endpoint {
59    /// Create a new endpoint
60    ///
61    /// `allow_mtud` enables path MTU detection when requested by `Connection` configuration for
62    /// better performance. This requires that outgoing packets are never fragmented, which can be
63    /// achieved via e.g. the `IPV6_DONTFRAG` socket option.
64    ///
65    /// If `rng_seed` is provided, it will be used to initialize the endpoint's rng (having priority
66    /// over the rng seed configured in [`EndpointConfig`]). Note that the `rng_seed` parameter will
67    /// be removed in a future release, so prefer setting it to `None` and configuring rng seeds
68    /// using [`EndpointConfig::rng_seed`].
69    pub fn new(
70        config: Arc<EndpointConfig>,
71        server_config: Option<Arc<ServerConfig>>,
72        allow_mtud: bool,
73        rng_seed: Option<[u8; 32]>,
74    ) -> Self {
75        let rng_seed = rng_seed.or(config.rng_seed);
76        Self {
77            rng: rng_seed.map_or(StdRng::from_os_rng(), StdRng::from_seed),
78            index: ConnectionIndex::default(),
79            connections: Slab::new(),
80            local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
81            config,
82            server_config,
83            allow_mtud,
84            last_stateless_reset: None,
85            incoming_buffers: Slab::new(),
86            all_incoming_buffers_total_bytes: 0,
87        }
88    }
89
90    /// Replace the server configuration, affecting new incoming connections only
91    pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
92        self.server_config = server_config;
93    }
94
95    /// Process `EndpointEvent`s emitted from related `Connection`s
96    ///
97    /// In turn, processing this event may return a `ConnectionEvent` for the same `Connection`.
98    pub fn handle_event(
99        &mut self,
100        ch: ConnectionHandle,
101        event: EndpointEvent,
102    ) -> Option<ConnectionEvent> {
103        use EndpointEventInner::*;
104        match event.0 {
105            NeedIdentifiers(path_id, now, n) => {
106                return Some(self.send_new_identifiers(path_id, now, ch, n));
107            }
108            ResetToken(path_id, remote, token) => {
109                if let Some(old) = self.connections[ch]
110                    .reset_token
111                    .insert(path_id, (remote, token))
112                {
113                    self.index.connection_reset_tokens.remove(old.0, old.1);
114                }
115                if self.index.connection_reset_tokens.insert(remote, token, ch) {
116                    warn!("duplicate reset token");
117                }
118            }
119            RetireResetToken(path_id) => {
120                if let Some(old) = self.connections[ch].reset_token.remove(&path_id) {
121                    self.index.connection_reset_tokens.remove(old.0, old.1);
122                }
123            }
124            RetireConnectionId(now, path_id, seq, allow_more_cids) => {
125                if let Some(cid) = self.connections[ch]
126                    .loc_cids
127                    .get_mut(&path_id)
128                    .and_then(|pcid| pcid.cids.remove(&seq))
129                {
130                    trace!(%path_id, "local CID retired {}: {}", seq, cid);
131                    self.index.retire(cid);
132                    if allow_more_cids {
133                        return Some(self.send_new_identifiers(path_id, now, ch, 1));
134                    }
135                }
136            }
137            Drained => {
138                if let Some(conn) = self.connections.try_remove(ch.0) {
139                    self.index.remove(&conn);
140                } else {
141                    // This indicates a bug in downstream code, which could cause spurious
142                    // connection loss instead of this error if the CID was (re)allocated prior to
143                    // the illegal call.
144                    error!(id = ch.0, "unknown connection drained");
145                }
146            }
147        }
148        None
149    }
150
151    /// Process an incoming UDP datagram
152    pub fn handle(
153        &mut self,
154        now: Instant,
155        network_path: FourTuple,
156        ecn: Option<EcnCodepoint>,
157        data: BytesMut,
158        buf: &mut Vec<u8>,
159    ) -> Option<DatagramEvent> {
160        // Partially decode packet or short-circuit if unable
161        let datagram_len = data.len();
162        let mut event = match PartialDecode::new(
163            data,
164            &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
165            &self.config.supported_versions,
166            self.config.grease_quic_bit,
167        ) {
168            Ok((first_decode, remaining)) => DatagramConnectionEvent {
169                now,
170                network_path,
171                path_id: PathId::ZERO, // Corrected later for existing paths
172                ecn,
173                first_decode,
174                remaining,
175            },
176            Err(PacketDecodeError::UnsupportedVersion {
177                src_cid,
178                dst_cid,
179                version,
180            }) => {
181                if self.server_config.is_none() {
182                    debug!("dropping packet with unsupported version");
183                    return None;
184                }
185                trace!("sending version negotiation");
186                // Negotiate versions
187                Header::VersionNegotiate {
188                    random: self.rng.random::<u8>() | 0x40,
189                    src_cid: dst_cid,
190                    dst_cid: src_cid,
191                }
192                .encode(buf);
193                // Grease with a reserved version
194                buf.write::<u32>(match version {
195                    0x0a1a_2a3a => 0x0a1a_2a4a,
196                    _ => 0x0a1a_2a3a,
197                });
198                for &version in &self.config.supported_versions {
199                    buf.write(version);
200                }
201                return Some(DatagramEvent::Response(Transmit {
202                    destination: network_path.remote,
203                    ecn: None,
204                    size: buf.len(),
205                    segment_size: None,
206                    src_ip: network_path.local_ip,
207                }));
208            }
209            Err(e) => {
210                trace!("malformed header: {}", e);
211                return None;
212            }
213        };
214
215        let dst_cid = event.first_decode.dst_cid();
216
217        if let Some(route_to) = self.index.get(&network_path, &event.first_decode) {
218            event.path_id = match route_to {
219                RouteDatagramTo::Incoming(_) => PathId::ZERO,
220                RouteDatagramTo::Connection(_, path_id) => path_id,
221            };
222            match route_to {
223                RouteDatagramTo::Incoming(incoming_idx) => {
224                    let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
225                    let config = &self.server_config.as_ref().unwrap();
226
227                    if incoming_buffer
228                        .total_bytes
229                        .checked_add(datagram_len as u64)
230                        .is_some_and(|n| n <= config.incoming_buffer_size)
231                        && self
232                            .all_incoming_buffers_total_bytes
233                            .checked_add(datagram_len as u64)
234                            .is_some_and(|n| n <= config.incoming_buffer_size_total)
235                    {
236                        incoming_buffer.datagrams.push(event);
237                        incoming_buffer.total_bytes += datagram_len as u64;
238                        self.all_incoming_buffers_total_bytes += datagram_len as u64;
239                    }
240
241                    None
242                }
243                RouteDatagramTo::Connection(ch, _path_id) => Some(DatagramEvent::ConnectionEvent(
244                    ch,
245                    ConnectionEvent(ConnectionEventInner::Datagram(event)),
246                )),
247            }
248        } else if event.first_decode.initial_header().is_some() {
249            // Potentially create a new connection
250
251            self.handle_first_packet(datagram_len, event, network_path, buf)
252        } else if event.first_decode.has_long_header() {
253            debug!(
254                "ignoring non-initial packet for unknown connection {}",
255                dst_cid
256            );
257            None
258        } else if !event.first_decode.is_initial()
259            && self.local_cid_generator.validate(dst_cid).is_err()
260        {
261            debug!("dropping packet with invalid CID");
262            None
263        } else if dst_cid.is_empty() {
264            trace!("dropping unrecognized short packet without ID");
265            None
266        } else {
267            // If we got this far, we're receiving a seemingly valid packet for an unknown
268            // connection. Send a stateless reset if possible.
269            self.stateless_reset(now, datagram_len, network_path, dst_cid, buf)
270                .map(DatagramEvent::Response)
271        }
272    }
273
274    /// Builds a stateless reset packet to respond with
275    fn stateless_reset(
276        &mut self,
277        now: Instant,
278        inciting_dgram_len: usize,
279        network_path: FourTuple,
280        dst_cid: ConnectionId,
281        buf: &mut Vec<u8>,
282    ) -> Option<Transmit> {
283        if self
284            .last_stateless_reset
285            .is_some_and(|last| last + self.config.min_reset_interval > now)
286        {
287            debug!("ignoring unexpected packet within minimum stateless reset interval");
288            return None;
289        }
290
291        /// Minimum amount of padding for the stateless reset to look like a short-header packet
292        const MIN_PADDING_LEN: usize = 5;
293
294        // Prevent amplification attacks and reset loops by ensuring we pad to at most 1 byte
295        // smaller than the inciting packet.
296        let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
297            Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
298            _ => {
299                debug!(
300                    "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
301                    inciting_dgram_len
302                );
303                return None;
304            }
305        };
306
307        debug!(%dst_cid, %network_path.remote, "sending stateless reset");
308        self.last_stateless_reset = Some(now);
309        // Resets with at least this much padding can't possibly be distinguished from real packets
310        const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
311        let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
312            max_padding_len
313        } else {
314            self.rng
315                .random_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
316        };
317        buf.reserve(padding_len + RESET_TOKEN_SIZE);
318        buf.resize(padding_len, 0);
319        self.rng.fill_bytes(&mut buf[0..padding_len]);
320        buf[0] = 0b0100_0000 | (buf[0] >> 2);
321        buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
322
323        debug_assert!(buf.len() < inciting_dgram_len);
324
325        Some(Transmit {
326            destination: network_path.remote,
327            ecn: None,
328            size: buf.len(),
329            segment_size: None,
330            src_ip: network_path.local_ip,
331        })
332    }
333
334    /// Initiate a connection
335    pub fn connect(
336        &mut self,
337        now: Instant,
338        config: ClientConfig,
339        remote: SocketAddr,
340        server_name: &str,
341    ) -> Result<(ConnectionHandle, Connection), ConnectError> {
342        if self.cids_exhausted() {
343            return Err(ConnectError::CidsExhausted);
344        }
345        if remote.port() == 0 || remote.ip().is_unspecified() {
346            return Err(ConnectError::InvalidRemoteAddress(remote));
347        }
348        if !self.config.supported_versions.contains(&config.version) {
349            return Err(ConnectError::UnsupportedVersion);
350        }
351
352        let remote_id = (config.initial_dst_cid_provider)();
353        trace!(initial_dcid = %remote_id);
354
355        let ch = ConnectionHandle(self.connections.vacant_key());
356        let loc_cid = self.new_cid(ch, PathId::ZERO);
357        let params = TransportParameters::new(
358            &config.transport,
359            &self.config,
360            self.local_cid_generator.as_ref(),
361            loc_cid,
362            None,
363            &mut self.rng,
364        );
365        let tls = config
366            .crypto
367            .start_session(config.version, server_name, &params)?;
368
369        let conn = self.add_connection(
370            ch,
371            config.version,
372            remote_id,
373            loc_cid,
374            remote_id,
375            FourTuple {
376                remote,
377                local_ip: None,
378            },
379            now,
380            tls,
381            config.transport,
382            SideArgs::Client {
383                token_store: config.token_store,
384                server_name: server_name.into(),
385            },
386            &params,
387        );
388        Ok((ch, conn))
389    }
390
391    /// Generates new CIDs and creates message to send to the connection state
392    fn send_new_identifiers(
393        &mut self,
394        path_id: PathId,
395        now: Instant,
396        ch: ConnectionHandle,
397        num: u64,
398    ) -> ConnectionEvent {
399        let mut ids = vec![];
400        for _ in 0..num {
401            let id = self.new_cid(ch, path_id);
402            let cid_meta = self.connections[ch].loc_cids.entry(path_id).or_default();
403            let sequence = cid_meta.issued;
404            cid_meta.issued += 1;
405            cid_meta.cids.insert(sequence, id);
406            ids.push(IssuedCid {
407                path_id,
408                sequence,
409                id,
410                reset_token: ResetToken::new(&*self.config.reset_key, id),
411            });
412        }
413        ConnectionEvent(ConnectionEventInner::NewIdentifiers(
414            ids,
415            now,
416            self.local_cid_generator.cid_len(),
417            self.local_cid_generator.cid_lifetime(),
418        ))
419    }
420
421    /// Generate a connection ID for `ch`
422    fn new_cid(&mut self, ch: ConnectionHandle, path_id: PathId) -> ConnectionId {
423        loop {
424            let cid = self.local_cid_generator.generate_cid();
425            if cid.is_empty() {
426                // Zero-length CID; nothing to track
427                debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
428                return cid;
429            }
430            if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
431                e.insert((ch, path_id));
432                break cid;
433            }
434        }
435    }
436
437    fn handle_first_packet(
438        &mut self,
439        datagram_len: usize,
440        event: DatagramConnectionEvent,
441        network_path: FourTuple,
442        buf: &mut Vec<u8>,
443    ) -> Option<DatagramEvent> {
444        let dst_cid = event.first_decode.dst_cid();
445        let header = event.first_decode.initial_header().unwrap();
446
447        let Some(server_config) = &self.server_config else {
448            debug!("packet for unrecognized connection {}", dst_cid);
449            return self
450                .stateless_reset(event.now, datagram_len, network_path, dst_cid, buf)
451                .map(DatagramEvent::Response);
452        };
453
454        if datagram_len < MIN_INITIAL_SIZE as usize {
455            debug!("ignoring short initial for connection {}", dst_cid);
456            return None;
457        }
458
459        let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
460            Ok(keys) => keys,
461            Err(UnsupportedVersion) => {
462                // This probably indicates that the user set supported_versions incorrectly in
463                // `EndpointConfig`.
464                debug!(
465                    "ignoring initial packet version {:#x} unsupported by cryptographic layer",
466                    header.version
467                );
468                return None;
469            }
470        };
471
472        if let Err(reason) = self.early_validate_first_packet(header) {
473            return Some(DatagramEvent::Response(self.initial_close(
474                header.version,
475                network_path,
476                &crypto,
477                header.src_cid,
478                reason,
479                buf,
480            )));
481        }
482
483        let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
484            Ok(packet) => packet,
485            Err(e) => {
486                trace!("unable to decode initial packet: {}", e);
487                return None;
488            }
489        };
490
491        if !packet.reserved_bits_valid() {
492            debug!("dropping connection attempt with invalid reserved bits");
493            return None;
494        }
495
496        let Header::Initial(header) = packet.header else {
497            panic!("non-initial packet in handle_first_packet()");
498        };
499
500        let server_config = self.server_config.as_ref().unwrap().clone();
501
502        let token = match IncomingToken::from_header(&header, &server_config, network_path.remote) {
503            Ok(token) => token,
504            Err(InvalidRetryTokenError) => {
505                debug!("rejecting invalid retry token");
506                return Some(DatagramEvent::Response(self.initial_close(
507                    header.version,
508                    network_path,
509                    &crypto,
510                    header.src_cid,
511                    TransportError::INVALID_TOKEN(""),
512                    buf,
513                )));
514            }
515        };
516
517        let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
518        self.index
519            .insert_initial_incoming(header.dst_cid, incoming_idx);
520
521        Some(DatagramEvent::NewConnection(Incoming {
522            received_at: event.now,
523            network_path,
524            ecn: event.ecn,
525            packet: InitialPacket {
526                header,
527                header_data: packet.header_data,
528                payload: packet.payload,
529            },
530            rest: event.remaining,
531            crypto,
532            token,
533            incoming_idx,
534            improper_drop_warner: IncomingImproperDropWarner,
535        }))
536    }
537
538    /// Attempt to accept this incoming connection (an error may still occur)
539    // box err to avoid clippy::result_large_err
540    pub fn accept(
541        &mut self,
542        mut incoming: Incoming,
543        now: Instant,
544        buf: &mut Vec<u8>,
545        server_config: Option<Arc<ServerConfig>>,
546    ) -> Result<(ConnectionHandle, Connection), Box<AcceptError>> {
547        let remote_address_validated = incoming.remote_address_validated();
548        incoming.improper_drop_warner.dismiss();
549        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
550        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
551
552        let packet_number = incoming.packet.header.number.expand(0);
553        let InitialHeader {
554            src_cid,
555            dst_cid,
556            version,
557            ..
558        } = incoming.packet.header;
559        let server_config =
560            server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
561
562        if server_config
563            .transport
564            .max_idle_timeout
565            .is_some_and(|timeout| {
566                incoming.received_at + Duration::from_millis(timeout.into()) <= now
567            })
568        {
569            debug!("abandoning accept of stale initial");
570            self.index.remove_initial(dst_cid);
571            return Err(Box::new(AcceptError {
572                cause: ConnectionError::TimedOut,
573                response: None,
574            }));
575        }
576
577        if self.cids_exhausted() {
578            debug!("refusing connection");
579            self.index.remove_initial(dst_cid);
580            return Err(Box::new(AcceptError {
581                cause: ConnectionError::CidsExhausted,
582                response: Some(self.initial_close(
583                    version,
584                    incoming.network_path,
585                    &incoming.crypto,
586                    src_cid,
587                    TransportError::CONNECTION_REFUSED(""),
588                    buf,
589                )),
590            }));
591        }
592
593        if incoming
594            .crypto
595            .packet
596            .remote
597            .decrypt(
598                PathId::ZERO,
599                packet_number,
600                &incoming.packet.header_data,
601                &mut incoming.packet.payload,
602            )
603            .is_err()
604        {
605            debug!(packet_number, "failed to authenticate initial packet");
606            self.index.remove_initial(dst_cid);
607            return Err(Box::new(AcceptError {
608                cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
609                response: None,
610            }));
611        };
612
613        let ch = ConnectionHandle(self.connections.vacant_key());
614        let loc_cid = self.new_cid(ch, PathId::ZERO);
615        let mut params = TransportParameters::new(
616            &server_config.transport,
617            &self.config,
618            self.local_cid_generator.as_ref(),
619            loc_cid,
620            Some(&server_config),
621            &mut self.rng,
622        );
623        params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
624        params.original_dst_cid = Some(incoming.token.orig_dst_cid);
625        params.retry_src_cid = incoming.token.retry_src_cid;
626        let mut pref_addr_cid = None;
627        if server_config.has_preferred_address() {
628            let cid = self.new_cid(ch, PathId::ZERO);
629            pref_addr_cid = Some(cid);
630            params.preferred_address = Some(PreferredAddress {
631                address_v4: server_config.preferred_address_v4,
632                address_v6: server_config.preferred_address_v6,
633                connection_id: cid,
634                stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
635            });
636        }
637
638        let tls = server_config.crypto.clone().start_session(version, &params);
639        let transport_config = server_config.transport.clone();
640        let mut conn = self.add_connection(
641            ch,
642            version,
643            dst_cid,
644            loc_cid,
645            src_cid,
646            incoming.network_path,
647            incoming.received_at,
648            tls,
649            transport_config,
650            SideArgs::Server {
651                server_config,
652                pref_addr_cid,
653                path_validated: remote_address_validated,
654            },
655            &params,
656        );
657        self.index.insert_initial(dst_cid, ch);
658
659        match conn.handle_first_packet(
660            incoming.received_at,
661            incoming.network_path,
662            incoming.ecn,
663            packet_number,
664            incoming.packet,
665            incoming.rest,
666        ) {
667            Ok(()) => {
668                trace!(id = ch.0, icid = %dst_cid, "new connection");
669
670                for event in incoming_buffer.datagrams {
671                    conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
672                }
673
674                Ok((ch, conn))
675            }
676            Err(e) => {
677                debug!("handshake failed: {}", e);
678                self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
679                let response = match e {
680                    ConnectionError::TransportError(ref e) => Some(self.initial_close(
681                        version,
682                        incoming.network_path,
683                        &incoming.crypto,
684                        src_cid,
685                        e.clone(),
686                        buf,
687                    )),
688                    _ => None,
689                };
690                Err(Box::new(AcceptError { cause: e, response }))
691            }
692        }
693    }
694
695    /// Check if we should refuse a connection attempt regardless of the packet's contents
696    fn early_validate_first_packet(
697        &mut self,
698        header: &ProtectedInitialHeader,
699    ) -> Result<(), TransportError> {
700        let config = &self.server_config.as_ref().unwrap();
701        if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
702            return Err(TransportError::CONNECTION_REFUSED(""));
703        }
704
705        // RFC9000 §7.2 dictates that initial (client-chosen) destination CIDs must be at least 8
706        // bytes. If this is a Retry packet, then the length must instead match our usual CID
707        // length. If we ever issue non-Retry address validation tokens via `NEW_TOKEN`, then we'll
708        // also need to validate CID length for those after decoding the token.
709        if header.dst_cid.len() < 8
710            && (header.token_pos.is_empty()
711                || header.dst_cid.len() != self.local_cid_generator.cid_len())
712        {
713            debug!(
714                "rejecting connection due to invalid DCID length {}",
715                header.dst_cid.len()
716            );
717            return Err(TransportError::PROTOCOL_VIOLATION(
718                "invalid destination CID length",
719            ));
720        }
721
722        Ok(())
723    }
724
725    /// Reject this incoming connection attempt
726    pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
727        self.clean_up_incoming(&incoming);
728        incoming.improper_drop_warner.dismiss();
729
730        self.initial_close(
731            incoming.packet.header.version,
732            incoming.network_path,
733            &incoming.crypto,
734            incoming.packet.header.src_cid,
735            TransportError::CONNECTION_REFUSED(""),
736            buf,
737        )
738    }
739
740    /// Respond with a retry packet, requiring the client to retry with address validation
741    ///
742    /// Errors if `incoming.may_retry()` is false.
743    pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
744        if !incoming.may_retry() {
745            return Err(RetryError(Box::new(incoming)));
746        }
747
748        self.clean_up_incoming(&incoming);
749        incoming.improper_drop_warner.dismiss();
750
751        let server_config = self.server_config.as_ref().unwrap();
752
753        // First Initial
754        // The peer will use this as the DCID of its following Initials. Initial DCIDs are
755        // looked up separately from Handshake/Data DCIDs, so there is no risk of collision
756        // with established connections. In the unlikely event that a collision occurs
757        // between two connections in the initial phase, both will fail fast and may be
758        // retried by the application layer.
759        let loc_cid = self.local_cid_generator.generate_cid();
760
761        let payload = TokenPayload::Retry {
762            address: incoming.network_path.remote,
763            orig_dst_cid: incoming.packet.header.dst_cid,
764            issued: server_config.time_source.now(),
765        };
766        let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
767
768        let header = Header::Retry {
769            src_cid: loc_cid,
770            dst_cid: incoming.packet.header.src_cid,
771            version: incoming.packet.header.version,
772        };
773
774        let encode = header.encode(buf);
775        buf.put_slice(&token);
776        buf.extend_from_slice(&server_config.crypto.retry_tag(
777            incoming.packet.header.version,
778            incoming.packet.header.dst_cid,
779            buf,
780        ));
781        encode.finish(buf, &*incoming.crypto.header.local, None);
782
783        Ok(Transmit {
784            destination: incoming.network_path.remote,
785            ecn: None,
786            size: buf.len(),
787            segment_size: None,
788            src_ip: incoming.network_path.local_ip,
789        })
790    }
791
792    /// Ignore this incoming connection attempt, not sending any packet in response
793    ///
794    /// Doing this actively, rather than merely dropping the [`Incoming`], is necessary to prevent
795    /// memory leaks due to state within [`Endpoint`] tracking the incoming connection.
796    pub fn ignore(&mut self, incoming: Incoming) {
797        self.clean_up_incoming(&incoming);
798        incoming.improper_drop_warner.dismiss();
799    }
800
801    /// Clean up endpoint data structures associated with an `Incoming`.
802    fn clean_up_incoming(&mut self, incoming: &Incoming) {
803        self.index.remove_initial(incoming.packet.header.dst_cid);
804        let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
805        self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
806    }
807
808    fn add_connection(
809        &mut self,
810        ch: ConnectionHandle,
811        version: u32,
812        init_cid: ConnectionId,
813        loc_cid: ConnectionId,
814        rem_cid: ConnectionId,
815        network_path: FourTuple,
816        now: Instant,
817        tls: Box<dyn crypto::Session>,
818        transport_config: Arc<TransportConfig>,
819        side_args: SideArgs,
820        // Only used for qlog.
821        params: &TransportParameters,
822    ) -> Connection {
823        let mut rng_seed = [0; 32];
824        self.rng.fill_bytes(&mut rng_seed);
825        let side = side_args.side();
826        let pref_addr_cid = side_args.pref_addr_cid();
827
828        let qlog =
829            transport_config.create_qlog_sink(side_args.side(), network_path.remote, init_cid, now);
830
831        qlog.emit_connection_started(
832            now,
833            loc_cid,
834            rem_cid,
835            network_path.remote,
836            network_path.local_ip,
837            params,
838        );
839
840        let conn = Connection::new(
841            self.config.clone(),
842            transport_config,
843            init_cid,
844            loc_cid,
845            rem_cid,
846            network_path,
847            tls,
848            self.local_cid_generator.as_ref(),
849            now,
850            version,
851            self.allow_mtud,
852            rng_seed,
853            side_args,
854            qlog,
855        );
856
857        let mut path_cids = PathLocalCids::default();
858        path_cids.cids.insert(path_cids.issued, loc_cid);
859        path_cids.issued += 1;
860
861        if let Some(cid) = pref_addr_cid {
862            debug_assert_eq!(path_cids.issued, 1, "preferred address cid seq must be 1");
863            path_cids.cids.insert(path_cids.issued, cid);
864            path_cids.issued += 1;
865        }
866
867        let id = self.connections.insert(ConnectionMeta {
868            init_cid,
869            loc_cids: FxHashMap::from_iter([(PathId::ZERO, path_cids)]),
870            network_path,
871            side,
872            reset_token: Default::default(),
873        });
874        debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
875
876        self.index.insert_conn(network_path, loc_cid, ch, side);
877
878        conn
879    }
880
881    fn initial_close(
882        &mut self,
883        version: u32,
884        network_path: FourTuple,
885        crypto: &Keys,
886        remote_id: ConnectionId,
887        reason: TransportError,
888        buf: &mut Vec<u8>,
889    ) -> Transmit {
890        // We don't need to worry about CID collisions in initial closes because the peer
891        // shouldn't respond, and if it does, and the CID collides, we'll just drop the
892        // unexpected response.
893        let local_id = self.local_cid_generator.generate_cid();
894        let number = PacketNumber::U8(0);
895        let header = Header::Initial(InitialHeader {
896            dst_cid: remote_id,
897            src_cid: local_id,
898            number,
899            token: Bytes::new(),
900            version,
901        });
902
903        let partial_encode = header.encode(buf);
904        let max_len =
905            INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
906        frame::Close::from(reason).encoder(max_len).encode(buf);
907        buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
908        partial_encode.finish(
909            buf,
910            &*crypto.header.local,
911            Some((0, Default::default(), &*crypto.packet.local)),
912        );
913        Transmit {
914            destination: network_path.remote,
915            ecn: None,
916            size: buf.len(),
917            segment_size: None,
918            src_ip: network_path.local_ip,
919        }
920    }
921
922    /// Access the configuration used by this endpoint
923    pub fn config(&self) -> &EndpointConfig {
924        &self.config
925    }
926
927    /// Number of connections that are currently open
928    pub fn open_connections(&self) -> usize {
929        self.connections.len()
930    }
931
932    /// Counter for the number of bytes currently used
933    /// in the buffers for Initial and 0-RTT messages for pending incoming connections
934    pub fn incoming_buffer_bytes(&self) -> u64 {
935        self.all_incoming_buffers_total_bytes
936    }
937
938    #[cfg(test)]
939    pub(crate) fn known_connections(&self) -> usize {
940        let x = self.connections.len();
941        debug_assert_eq!(x, self.index.connection_ids_initial.len());
942        // Not all connections have known reset tokens
943        debug_assert!(x >= self.index.connection_reset_tokens.0.len());
944        // Not all connections have unique remotes, and 0-length CIDs might not be in use.
945        debug_assert!(x >= self.index.incoming_connection_remotes.len());
946        debug_assert!(x >= self.index.outgoing_connection_remotes.len());
947        x
948    }
949
950    #[cfg(test)]
951    pub(crate) fn known_cids(&self) -> usize {
952        self.index.connection_ids.len()
953    }
954
955    /// Whether we've used up 3/4 of the available CID space
956    ///
957    /// We leave some space unused so that `new_cid` can be relied upon to finish quickly. We don't
958    /// bother to check when CID longer than 4 bytes are used because 2^40 connections is a lot.
959    fn cids_exhausted(&self) -> bool {
960        self.local_cid_generator.cid_len() <= 4
961            && self.local_cid_generator.cid_len() != 0
962            && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
963                - self.index.connection_ids.len())
964                < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
965    }
966}
967
968impl fmt::Debug for Endpoint {
969    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
970        fmt.debug_struct("Endpoint")
971            .field("rng", &self.rng)
972            .field("index", &self.index)
973            .field("connections", &self.connections)
974            .field("config", &self.config)
975            .field("server_config", &self.server_config)
976            // incoming_buffers too large
977            .field("incoming_buffers.len", &self.incoming_buffers.len())
978            .field(
979                "all_incoming_buffers_total_bytes",
980                &self.all_incoming_buffers_total_bytes,
981            )
982            .finish()
983    }
984}
985
986/// Buffered Initial and 0-RTT messages for a pending incoming connection
987#[derive(Default)]
988struct IncomingBuffer {
989    datagrams: Vec<DatagramConnectionEvent>,
990    total_bytes: u64,
991}
992
993/// Part of protocol state incoming datagrams can be routed to
994#[derive(Copy, Clone, Debug)]
995enum RouteDatagramTo {
996    Incoming(usize),
997    Connection(ConnectionHandle, PathId),
998}
999
1000/// Maps packets to existing connections
1001#[derive(Default, Debug)]
1002struct ConnectionIndex {
1003    /// Identifies connections based on the initial DCID the peer utilized
1004    ///
1005    /// Uses a standard `HashMap` to protect against hash collision attacks.
1006    ///
1007    /// Used by the server, not the client.
1008    connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
1009    /// Identifies connections based on locally created CIDs
1010    ///
1011    /// Uses a cheaper hash function since keys are locally created
1012    connection_ids: FxHashMap<ConnectionId, (ConnectionHandle, PathId)>,
1013    /// Identifies incoming connections with zero-length CIDs
1014    ///
1015    /// Uses a standard `HashMap` to protect against hash collision attacks.
1016    incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
1017    /// Identifies outgoing connections with zero-length CIDs
1018    ///
1019    /// We don't yet support explicit source addresses for client connections, and zero-length CIDs
1020    /// require a unique 4-tuple, so at most one client connection with zero-length local CIDs
1021    /// may be established per remote. We must omit the local address from the key because we don't
1022    /// necessarily know what address we're sending from, and hence receiving at.
1023    ///
1024    /// Uses a standard `HashMap` to protect against hash collision attacks.
1025    // TODO(matheus23): It's possible this could be changed now that we track the full 4-tuple on the client side, too.
1026    outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
1027    /// Reset tokens provided by the peer for the CID each connection is currently sending to
1028    ///
1029    /// Incoming stateless resets do not have correct CIDs, so we need this to identify the correct
1030    /// recipient, if any.
1031    connection_reset_tokens: ResetTokenTable,
1032}
1033
1034impl ConnectionIndex {
1035    /// Associate an incoming connection with its initial destination CID
1036    fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
1037        if dst_cid.is_empty() {
1038            return;
1039        }
1040        self.connection_ids_initial
1041            .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1042    }
1043
1044    /// Remove an association with an initial destination CID
1045    fn remove_initial(&mut self, dst_cid: ConnectionId) {
1046        if dst_cid.is_empty() {
1047            return;
1048        }
1049        let removed = self.connection_ids_initial.remove(&dst_cid);
1050        debug_assert!(removed.is_some());
1051    }
1052
1053    /// Associate a connection with its initial destination CID
1054    fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1055        if dst_cid.is_empty() {
1056            return;
1057        }
1058        self.connection_ids_initial.insert(
1059            dst_cid,
1060            RouteDatagramTo::Connection(connection, PathId::ZERO),
1061        );
1062    }
1063
1064    /// Associate a connection with its first locally-chosen destination CID if used, or otherwise
1065    /// its current 4-tuple
1066    fn insert_conn(
1067        &mut self,
1068        network_path: FourTuple,
1069        dst_cid: ConnectionId,
1070        connection: ConnectionHandle,
1071        side: Side,
1072    ) {
1073        match dst_cid.len() {
1074            0 => match side {
1075                Side::Server => {
1076                    self.incoming_connection_remotes
1077                        .insert(network_path, connection);
1078                }
1079                Side::Client => {
1080                    self.outgoing_connection_remotes
1081                        .insert(network_path.remote, connection);
1082                }
1083            },
1084            _ => {
1085                self.connection_ids
1086                    .insert(dst_cid, (connection, PathId::ZERO));
1087            }
1088        }
1089    }
1090
1091    /// Discard a connection ID
1092    fn retire(&mut self, dst_cid: ConnectionId) {
1093        self.connection_ids.remove(&dst_cid);
1094    }
1095
1096    /// Remove all references to a connection
1097    fn remove(&mut self, conn: &ConnectionMeta) {
1098        if conn.side.is_server() {
1099            self.remove_initial(conn.init_cid);
1100        }
1101        for cid in conn.loc_cids.values().flat_map(|pcids| pcids.cids.values()) {
1102            self.connection_ids.remove(cid);
1103        }
1104        self.incoming_connection_remotes.remove(&conn.network_path);
1105        self.outgoing_connection_remotes
1106            .remove(&conn.network_path.remote);
1107        for (remote, token) in conn.reset_token.values() {
1108            self.connection_reset_tokens.remove(*remote, *token);
1109        }
1110    }
1111
1112    /// Find the existing connection that `datagram` should be routed to, if any
1113    fn get(&self, network_path: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1114        if !datagram.dst_cid().is_empty() {
1115            if let Some(&(ch, path_id)) = self.connection_ids.get(&datagram.dst_cid()) {
1116                return Some(RouteDatagramTo::Connection(ch, path_id));
1117            }
1118        }
1119        if datagram.is_initial() || datagram.is_0rtt() {
1120            if let Some(&ch) = self.connection_ids_initial.get(&datagram.dst_cid()) {
1121                return Some(ch);
1122            }
1123        }
1124        if datagram.dst_cid().is_empty() {
1125            if let Some(&ch) = self.incoming_connection_remotes.get(network_path) {
1126                // Never multipath because QUIC-MULTIPATH 1.1 mandates the use of non-zero
1127                // length CIDs.  So this is always PathId::ZERO.
1128                return Some(RouteDatagramTo::Connection(ch, PathId::ZERO));
1129            }
1130            if let Some(&ch) = self.outgoing_connection_remotes.get(&network_path.remote) {
1131                // Like above, QUIC-MULTIPATH 1.1 mandates the use of non-zero length CIDs.
1132                return Some(RouteDatagramTo::Connection(ch, PathId::ZERO));
1133            }
1134        }
1135        let data = datagram.data();
1136        if data.len() < RESET_TOKEN_SIZE {
1137            return None;
1138        }
1139        // For stateless resets the PathId is meaningless since it closes the entire
1140        // connection regardless of path.  So use PathId::ZERO.
1141        self.connection_reset_tokens
1142            .get(network_path.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1143            .cloned()
1144            .map(|ch| RouteDatagramTo::Connection(ch, PathId::ZERO))
1145    }
1146}
1147
1148#[derive(Debug)]
1149pub(crate) struct ConnectionMeta {
1150    init_cid: ConnectionId,
1151    /// Locally issues CIDs for each path
1152    loc_cids: FxHashMap<PathId, PathLocalCids>,
1153    /// Remote/local addresses the connection began with
1154    ///
1155    /// Only needed to support connections with zero-length CIDs, which cannot migrate, so we don't
1156    /// bother keeping it up to date.
1157    network_path: FourTuple,
1158    side: Side,
1159    /// Reset tokens provided by the peer for CIDs we're currently sending to
1160    ///
1161    /// Since each reset token is for a CID, it is also for a fixed remote address which is
1162    /// also stored. This allows us to look up which reset tokens we might expect from a
1163    /// given remote address, see [`ResetTokenTable`].
1164    ///
1165    /// Each path has its own active CID. We use the [`PathId`] as a unique index, allowing
1166    /// us to retire the reset token when a path is abandoned.
1167    // TODO(matheus23): Should be migrated to make reset tokens per 4-tuple instead of per remote addr
1168    reset_token: FxHashMap<PathId, (SocketAddr, ResetToken)>,
1169}
1170
1171/// Local connection IDs for a single path
1172#[derive(Debug, Default)]
1173struct PathLocalCids {
1174    /// Number of connection IDs that have been issued in (PATH_)NEW_CONNECTION_ID frames
1175    ///
1176    /// Another way of saying this is that this is the next sequence number to be issued.
1177    issued: u64,
1178    /// Issues CIDs indexed by their sequence number.
1179    cids: FxHashMap<u64, ConnectionId>,
1180}
1181
1182/// Internal identifier for a `Connection` currently associated with an endpoint
1183#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1184pub struct ConnectionHandle(pub usize);
1185
1186impl From<ConnectionHandle> for usize {
1187    fn from(x: ConnectionHandle) -> Self {
1188        x.0
1189    }
1190}
1191
1192impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1193    type Output = ConnectionMeta;
1194    fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1195        &self[ch.0]
1196    }
1197}
1198
1199impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1200    fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1201        &mut self[ch.0]
1202    }
1203}
1204
1205/// Event resulting from processing a single datagram
1206pub enum DatagramEvent {
1207    /// The datagram is redirected to its `Connection`
1208    ConnectionEvent(ConnectionHandle, ConnectionEvent),
1209    /// The datagram may result in starting a new `Connection`
1210    NewConnection(Incoming),
1211    /// Response generated directly by the endpoint
1212    Response(Transmit),
1213}
1214
1215/// An incoming connection for which the server has not yet begun its part of the handshake.
1216#[derive(derive_more::Debug)]
1217pub struct Incoming {
1218    #[debug(skip)]
1219    received_at: Instant,
1220    network_path: FourTuple,
1221    ecn: Option<EcnCodepoint>,
1222    #[debug(skip)]
1223    packet: InitialPacket,
1224    #[debug(skip)]
1225    rest: Option<BytesMut>,
1226    #[debug(skip)]
1227    crypto: Keys,
1228    token: IncomingToken,
1229    incoming_idx: usize,
1230    #[debug(skip)]
1231    improper_drop_warner: IncomingImproperDropWarner,
1232}
1233
1234impl Incoming {
1235    /// The local IP address which was used when the peer established the connection
1236    ///
1237    /// This has the same behavior as [`Connection::local_ip`].
1238    pub fn local_ip(&self) -> Option<IpAddr> {
1239        self.network_path.local_ip
1240    }
1241
1242    /// The peer's UDP address
1243    pub fn remote_address(&self) -> SocketAddr {
1244        self.network_path.remote
1245    }
1246
1247    /// Whether the socket address that is initiating this connection has been validated
1248    ///
1249    /// This means that the sender of the initial packet has proved that they can receive traffic
1250    /// sent to `self.remote_address()`.
1251    ///
1252    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1253    /// The inverse is not guaranteed.
1254    pub fn remote_address_validated(&self) -> bool {
1255        self.token.validated
1256    }
1257
1258    /// Whether it is legal to respond with a retry packet
1259    ///
1260    /// If `self.remote_address_validated()` is false, `self.may_retry()` is guaranteed to be true.
1261    /// The inverse is not guaranteed.
1262    pub fn may_retry(&self) -> bool {
1263        self.token.retry_src_cid.is_none()
1264    }
1265
1266    /// The original destination connection ID sent by the client
1267    pub fn orig_dst_cid(&self) -> ConnectionId {
1268        self.token.orig_dst_cid
1269    }
1270}
1271
1272struct IncomingImproperDropWarner;
1273
1274impl IncomingImproperDropWarner {
1275    fn dismiss(self) {
1276        mem::forget(self);
1277    }
1278}
1279
1280impl Drop for IncomingImproperDropWarner {
1281    fn drop(&mut self) {
1282        warn!(
1283            "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1284               (may cause memory leak and eventual inability to accept new connections)"
1285        );
1286    }
1287}
1288
1289/// Errors in the parameters being used to create a new connection
1290///
1291/// These arise before any I/O has been performed.
1292#[derive(Debug, Error, Clone, PartialEq, Eq)]
1293pub enum ConnectError {
1294    /// The endpoint can no longer create new connections
1295    ///
1296    /// Indicates that a necessary component of the endpoint has been dropped or otherwise disabled.
1297    #[error("endpoint stopping")]
1298    EndpointStopping,
1299    /// The connection could not be created because not enough of the CID space is available
1300    ///
1301    /// Try using longer connection IDs
1302    #[error("CIDs exhausted")]
1303    CidsExhausted,
1304    /// The given server name was malformed
1305    #[error("invalid server name: {0}")]
1306    InvalidServerName(String),
1307    /// The remote [`SocketAddr`] supplied was malformed
1308    ///
1309    /// Examples include attempting to connect to port 0, or using an inappropriate address family.
1310    #[error("invalid remote address: {0}")]
1311    InvalidRemoteAddress(SocketAddr),
1312    /// No default client configuration was set up
1313    ///
1314    /// Use `Endpoint::connect_with` to specify a client configuration.
1315    #[error("no default client config")]
1316    NoDefaultClientConfig,
1317    /// The local endpoint does not support the QUIC version specified in the client configuration
1318    #[error("unsupported QUIC version")]
1319    UnsupportedVersion,
1320}
1321
1322/// Error type for attempting to accept an [`Incoming`]
1323#[derive(Debug)]
1324pub struct AcceptError {
1325    /// Underlying error describing reason for failure
1326    pub cause: ConnectionError,
1327    /// Optional response to transmit back
1328    pub response: Option<Transmit>,
1329}
1330
1331/// Error for attempting to retry an [`Incoming`] which already bears a token from a previous retry
1332#[derive(Debug, Error)]
1333#[error("retry() with validated Incoming")]
1334pub struct RetryError(Box<Incoming>);
1335
1336impl RetryError {
1337    /// Get the [`Incoming`]
1338    pub fn into_incoming(self) -> Incoming {
1339        *self.0
1340    }
1341}
1342
1343/// Reset Tokens which are associated with peer socket addresses
1344///
1345/// The standard `HashMap` is used since both `SocketAddr` and `ResetToken` are
1346/// peer generated and might be usable for hash collision attacks.
1347#[derive(Default, Debug)]
1348struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1349
1350impl ResetTokenTable {
1351    fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1352        self.0
1353            .entry(remote)
1354            .or_default()
1355            .insert(token, ch)
1356            .is_some()
1357    }
1358
1359    fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1360        use std::collections::hash_map::Entry;
1361        match self.0.entry(remote) {
1362            Entry::Vacant(_) => {}
1363            Entry::Occupied(mut e) => {
1364                e.get_mut().remove(&token);
1365                if e.get().is_empty() {
1366                    e.remove_entry();
1367                }
1368            }
1369        }
1370    }
1371
1372    fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1373        let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1374        self.0.get(&remote)?.get(&token)
1375    }
1376}