1use std::{
2 collections::VecDeque,
3 fmt,
4 future::Future,
5 io::{self, IoSliceMut},
6 mem,
7 net::{SocketAddr, SocketAddrV6},
8 pin::Pin,
9 str,
10 sync::{Arc, Mutex},
11 task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
12};
13
14#[cfg(all(
15 not(wasm_browser),
16 any(feature = "runtime-tokio", feature = "runtime-smol"),
17 any(feature = "aws-lc-rs", feature = "ring"),
18))]
19use crate::runtime::default_runtime;
20use crate::{
21 Instant,
22 runtime::{AsyncUdpSocket, Runtime, UdpSender},
23 udp_transmit,
24};
25use bytes::{Bytes, BytesMut};
26use pin_project_lite::pin_project;
27use proto::{
28 self as proto, ClientConfig, ConnectError, ConnectionError, ConnectionHandle, DatagramEvent,
29 EndpointEvent, ServerConfig,
30};
31use rustc_hash::FxHashMap;
32#[cfg(all(
33 not(wasm_browser),
34 any(feature = "runtime-tokio", feature = "runtime-smol"),
35 any(feature = "aws-lc-rs", feature = "ring"),
36))]
37use socket2::{Domain, Protocol, Socket, Type};
38use tokio::sync::{Notify, futures::Notified, mpsc};
39use tracing::{Instrument, Span};
40use udp::{BATCH_SIZE, RecvMeta};
41
42use crate::{
43 ConnectionEvent, EndpointConfig, IO_LOOP_BOUND, RECV_TIME_BOUND, VarInt,
44 connection::Connecting, incoming::Incoming, work_limiter::WorkLimiter,
45};
46
47#[derive(Debug, Clone)]
54pub struct Endpoint {
55 pub(crate) inner: EndpointRef,
56 pub(crate) default_client_config: Option<ClientConfig>,
57 runtime: Arc<dyn Runtime>,
58}
59
60impl Endpoint {
61 #[cfg(all(
79 not(wasm_browser),
80 any(feature = "runtime-tokio", feature = "runtime-smol"),
81 any(feature = "aws-lc-rs", feature = "ring"), ))]
83 pub fn client(addr: SocketAddr) -> io::Result<Self> {
84 let socket = Socket::new(Domain::for_address(addr), Type::DGRAM, Some(Protocol::UDP))?;
85 if addr.is_ipv6() {
86 if let Err(e) = socket.set_only_v6(false) {
87 tracing::debug!(%e, "unable to make socket dual-stack");
88 }
89 }
90 socket.bind(&addr.into())?;
91 let runtime =
92 default_runtime().ok_or_else(|| io::Error::other("no async runtime found"))?;
93 Self::new_with_abstract_socket(
94 EndpointConfig::default(),
95 None,
96 runtime.wrap_udp_socket(socket.into())?,
97 runtime,
98 )
99 }
100
101 pub fn stats(&self) -> EndpointStats {
103 self.inner.state.lock().unwrap().stats
104 }
105
106 #[cfg(all(
113 not(wasm_browser),
114 any(feature = "runtime-tokio", feature = "runtime-smol"),
115 any(feature = "aws-lc-rs", feature = "ring"), ))]
117 pub fn server(config: ServerConfig, addr: SocketAddr) -> io::Result<Self> {
118 let socket = std::net::UdpSocket::bind(addr)?;
119 let runtime =
120 default_runtime().ok_or_else(|| io::Error::other("no async runtime found"))?;
121 Self::new_with_abstract_socket(
122 EndpointConfig::default(),
123 Some(config),
124 runtime.wrap_udp_socket(socket)?,
125 runtime,
126 )
127 }
128
129 #[cfg(not(wasm_browser))]
131 pub fn new(
132 config: EndpointConfig,
133 server_config: Option<ServerConfig>,
134 socket: std::net::UdpSocket,
135 runtime: Arc<dyn Runtime>,
136 ) -> io::Result<Self> {
137 let socket = runtime.wrap_udp_socket(socket)?;
138 Self::new_with_abstract_socket(config, server_config, socket, runtime)
139 }
140
141 pub fn new_with_abstract_socket(
146 config: EndpointConfig,
147 server_config: Option<ServerConfig>,
148 socket: Box<dyn AsyncUdpSocket>,
149 runtime: Arc<dyn Runtime>,
150 ) -> io::Result<Self> {
151 let addr = socket.local_addr()?;
152 let allow_mtud = !socket.may_fragment();
153 let rc = EndpointRef::new(
154 socket,
155 proto::Endpoint::new(
156 Arc::new(config),
157 server_config.map(Arc::new),
158 allow_mtud,
159 None,
160 ),
161 addr.is_ipv6(),
162 runtime.clone(),
163 );
164 let driver = EndpointDriver(rc.clone());
165 runtime.spawn(Box::pin(
166 async {
167 if let Err(e) = driver.await {
168 tracing::error!("I/O error: {}", e);
169 }
170 }
171 .instrument(Span::current()),
172 ));
173 Ok(Self {
174 inner: rc,
175 default_client_config: None,
176 runtime,
177 })
178 }
179
180 pub fn accept(&self) -> Accept<'_> {
187 Accept {
188 endpoint: self,
189 notify: self.inner.shared.incoming.notified(),
190 }
191 }
192
193 pub fn set_default_client_config(&mut self, config: ClientConfig) {
195 self.default_client_config = Some(config);
196 }
197
198 pub fn connect(&self, addr: SocketAddr, server_name: &str) -> Result<Connecting, ConnectError> {
207 let config = match &self.default_client_config {
208 Some(config) => config.clone(),
209 None => return Err(ConnectError::NoDefaultClientConfig),
210 };
211
212 self.connect_with(config, addr, server_name)
213 }
214
215 pub fn connect_with(
221 &self,
222 config: ClientConfig,
223 addr: SocketAddr,
224 server_name: &str,
225 ) -> Result<Connecting, ConnectError> {
226 let mut endpoint = self.inner.state.lock().unwrap();
227 if endpoint.driver_lost || endpoint.recv_state.connections.close.is_some() {
228 return Err(ConnectError::EndpointStopping);
229 }
230 if addr.is_ipv6() && !endpoint.ipv6 {
231 return Err(ConnectError::InvalidRemoteAddress(addr));
232 }
233 let addr = if endpoint.ipv6 {
234 SocketAddr::V6(ensure_ipv6(addr))
235 } else {
236 addr
237 };
238
239 let (ch, conn) = endpoint
240 .inner
241 .connect(self.runtime.now(), config, addr, server_name)?;
242
243 let sender = endpoint.socket.create_sender();
244 endpoint.stats.outgoing_handshakes += 1;
245 Ok(endpoint
246 .recv_state
247 .connections
248 .insert(ch, conn, sender, self.runtime.clone()))
249 }
250
251 #[cfg(not(wasm_browser))]
255 pub fn rebind(&self, socket: std::net::UdpSocket) -> io::Result<()> {
256 self.rebind_abstract(self.runtime.wrap_udp_socket(socket)?)
257 }
258
259 pub fn rebind_abstract(&self, socket: Box<dyn AsyncUdpSocket>) -> io::Result<()> {
266 let addr = socket.local_addr()?;
267 let mut inner = self.inner.state.lock().unwrap();
268 inner.prev_socket = Some(mem::replace(&mut inner.socket, socket));
269 inner.ipv6 = addr.is_ipv6();
270
271 for sender in inner.recv_state.connections.senders.values() {
273 let _ = sender.send(ConnectionEvent::Rebind(inner.socket.create_sender()));
275 }
276 if let Some(driver) = inner.driver.take() {
277 driver.wake();
279 }
280
281 Ok(())
282 }
283
284 pub fn set_server_config(&self, server_config: Option<ServerConfig>) {
288 self.inner
289 .state
290 .lock()
291 .unwrap()
292 .inner
293 .set_server_config(server_config.map(Arc::new))
294 }
295
296 pub fn local_addr(&self) -> io::Result<SocketAddr> {
298 self.inner.state.lock().unwrap().socket.local_addr()
299 }
300
301 pub fn open_connections(&self) -> usize {
303 self.inner.state.lock().unwrap().inner.open_connections()
304 }
305
306 pub fn close(&self, error_code: VarInt, reason: &[u8]) {
312 let reason = Bytes::copy_from_slice(reason);
313 let mut endpoint = self.inner.state.lock().unwrap();
314 endpoint.recv_state.connections.close = Some((error_code, reason.clone()));
315 for sender in endpoint.recv_state.connections.senders.values() {
316 let _ = sender.send(ConnectionEvent::Close {
318 error_code,
319 reason: reason.clone(),
320 });
321 }
322 self.inner.shared.incoming.notify_waiters();
323 }
324
325 pub async fn wait_idle(&self) {
336 loop {
337 {
338 let endpoint = &mut *self.inner.state.lock().unwrap();
339 if endpoint.recv_state.connections.is_empty() {
340 break;
341 }
342 self.inner.shared.idle.notified()
344 }
345 .await;
346 }
347 }
348}
349
350#[non_exhaustive]
352#[derive(Debug, Default, Copy, Clone)]
353pub struct EndpointStats {
354 pub accepted_handshakes: u64,
356 pub outgoing_handshakes: u64,
358 pub refused_handshakes: u64,
360 pub ignored_handshakes: u64,
362}
363
364#[must_use = "endpoint drivers must be spawned for I/O to occur"]
375#[derive(Debug)]
376pub(crate) struct EndpointDriver(pub(crate) EndpointRef);
377
378impl Future for EndpointDriver {
379 type Output = Result<(), io::Error>;
380
381 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
382 let mut endpoint = self.0.state.lock().unwrap();
383 if endpoint.driver.is_none() {
384 endpoint.driver = Some(cx.waker().clone());
385 }
386
387 let now = endpoint.runtime.now();
388 let mut keep_going = false;
389 keep_going |= endpoint.drive_recv(cx, now)?;
390 keep_going |= endpoint.handle_events(cx, &self.0.shared);
391
392 if !endpoint.recv_state.incoming.is_empty() {
393 self.0.shared.incoming.notify_waiters();
394 }
395
396 if endpoint.ref_count == 0 && endpoint.recv_state.connections.is_empty() {
397 Poll::Ready(Ok(()))
398 } else {
399 drop(endpoint);
400 if keep_going {
404 cx.waker().wake_by_ref();
405 }
406 Poll::Pending
407 }
408 }
409}
410
411impl Drop for EndpointDriver {
412 fn drop(&mut self) {
413 let mut endpoint = self.0.state.lock().unwrap();
414 endpoint.driver_lost = true;
415 self.0.shared.incoming.notify_waiters();
416 endpoint.recv_state.connections.senders.clear();
419 }
420}
421
422#[derive(Debug)]
423pub(crate) struct EndpointInner {
424 pub(crate) state: Mutex<State>,
425 pub(crate) shared: Shared,
426}
427
428impl EndpointInner {
429 pub(crate) fn accept(
430 &self,
431 incoming: proto::Incoming,
432 server_config: Option<Arc<ServerConfig>>,
433 ) -> Result<Connecting, ConnectionError> {
434 let mut state = self.state.lock().unwrap();
435 let mut response_buffer = Vec::new();
436 let now = state.runtime.now();
437 match state
438 .inner
439 .accept(incoming, now, &mut response_buffer, server_config)
440 {
441 Ok((handle, conn)) => {
442 state.stats.accepted_handshakes += 1;
443 let sender = state.socket.create_sender();
444 let runtime = state.runtime.clone();
445 Ok(state
446 .recv_state
447 .connections
448 .insert(handle, conn, sender, runtime))
449 }
450 Err(error) => {
451 if let Some(transmit) = error.response {
452 respond(transmit, &response_buffer, &mut state.sender);
453 }
454 Err(error.cause)
455 }
456 }
457 }
458
459 pub(crate) fn refuse(&self, incoming: proto::Incoming) {
460 let mut state = self.state.lock().unwrap();
461 state.stats.refused_handshakes += 1;
462 let mut response_buffer = Vec::new();
463 let transmit = state.inner.refuse(incoming, &mut response_buffer);
464 respond(transmit, &response_buffer, &mut state.sender);
465 }
466
467 pub(crate) fn retry(&self, incoming: proto::Incoming) -> Result<(), proto::RetryError> {
468 let mut state = self.state.lock().unwrap();
469 let mut response_buffer = Vec::new();
470 let transmit = state.inner.retry(incoming, &mut response_buffer)?;
471 respond(transmit, &response_buffer, &mut state.sender);
472 Ok(())
473 }
474
475 pub(crate) fn ignore(&self, incoming: proto::Incoming) {
476 let mut state = self.state.lock().unwrap();
477 state.stats.ignored_handshakes += 1;
478 state.inner.ignore(incoming);
479 }
480}
481
482#[derive(Debug)]
483pub(crate) struct State {
484 socket: Box<dyn AsyncUdpSocket>,
485 sender: Pin<Box<dyn UdpSender>>,
486 prev_socket: Option<Box<dyn AsyncUdpSocket>>,
489 inner: proto::Endpoint,
490 recv_state: RecvState,
491 driver: Option<Waker>,
492 ipv6: bool,
493 events: mpsc::UnboundedReceiver<(ConnectionHandle, EndpointEvent)>,
494 ref_count: usize,
496 driver_lost: bool,
497 runtime: Arc<dyn Runtime>,
498 stats: EndpointStats,
499}
500
501#[derive(Debug)]
502pub(crate) struct Shared {
503 incoming: Notify,
504 idle: Notify,
505}
506
507impl State {
508 fn drive_recv(&mut self, cx: &mut Context, now: Instant) -> Result<bool, io::Error> {
509 let get_time = || self.runtime.now();
510 self.recv_state.recv_limiter.start_cycle(get_time);
511 if let Some(socket) = &mut self.prev_socket {
512 let poll_res = self.recv_state.poll_socket(
514 cx,
515 &mut self.inner,
516 &mut **socket,
517 &mut self.sender,
518 &*self.runtime,
519 now,
520 );
521 if poll_res.is_err() {
522 self.prev_socket = None;
523 }
524 };
525 let poll_res = self.recv_state.poll_socket(
526 cx,
527 &mut self.inner,
528 &mut *self.socket,
529 &mut self.sender,
530 &*self.runtime,
531 now,
532 );
533 self.recv_state.recv_limiter.finish_cycle(get_time);
534 let poll_res = poll_res?;
535 if poll_res.received_connection_packet {
536 self.prev_socket = None;
539 }
540 Ok(poll_res.keep_going)
541 }
542
543 fn handle_events(&mut self, cx: &mut Context, shared: &Shared) -> bool {
544 for _ in 0..IO_LOOP_BOUND {
545 let (ch, event) = match self.events.poll_recv(cx) {
546 Poll::Ready(Some(x)) => x,
547 Poll::Ready(None) => unreachable!("EndpointInner owns one sender"),
548 Poll::Pending => {
549 return false;
550 }
551 };
552
553 if event.is_drained() {
554 self.recv_state.connections.senders.remove(&ch);
555 if self.recv_state.connections.is_empty() {
556 shared.idle.notify_waiters();
557 }
558 }
559 let Some(event) = self.inner.handle_event(ch, event) else {
560 continue;
561 };
562 let _ = self
564 .recv_state
565 .connections
566 .senders
567 .get_mut(&ch)
568 .unwrap()
569 .send(ConnectionEvent::Proto(event));
570 }
571
572 true
573 }
574}
575
576impl Drop for State {
577 fn drop(&mut self) {
578 for incoming in self.recv_state.incoming.drain(..) {
579 self.inner.ignore(incoming);
580 }
581 }
582}
583
584fn respond(
585 transmit: proto::Transmit,
586 response_buffer: &[u8],
587 sender: &mut Pin<Box<dyn UdpSender>>,
588) {
589 const NOOP: RawWaker = {
612 const VTABLE: RawWakerVTable = RawWakerVTable::new(
613 |_| NOOP,
615 |_| {},
617 |_| {},
619 |_| {},
621 );
622 RawWaker::new(std::ptr::null(), &VTABLE)
623 };
624 let waker = unsafe { Waker::from_raw(NOOP) };
627 let mut cx = Context::from_waker(&waker);
628 _ = sender.as_mut().poll_send(
629 &udp_transmit(&transmit, &response_buffer[..transmit.size]),
630 &mut cx,
631 );
632}
633
634#[inline]
635fn proto_ecn(ecn: udp::EcnCodepoint) -> proto::EcnCodepoint {
636 match ecn {
637 udp::EcnCodepoint::Ect0 => proto::EcnCodepoint::Ect0,
638 udp::EcnCodepoint::Ect1 => proto::EcnCodepoint::Ect1,
639 udp::EcnCodepoint::Ce => proto::EcnCodepoint::Ce,
640 }
641}
642
643#[derive(Debug)]
644struct ConnectionSet {
645 senders: FxHashMap<ConnectionHandle, mpsc::UnboundedSender<ConnectionEvent>>,
647 sender: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
649 close: Option<(VarInt, Bytes)>,
651}
652
653impl ConnectionSet {
654 fn insert(
655 &mut self,
656 handle: ConnectionHandle,
657 conn: proto::Connection,
658 sender: Pin<Box<dyn UdpSender>>,
659 runtime: Arc<dyn Runtime>,
660 ) -> Connecting {
661 let (send, recv) = mpsc::unbounded_channel();
662 if let Some((error_code, ref reason)) = self.close {
663 send.send(ConnectionEvent::Close {
664 error_code,
665 reason: reason.clone(),
666 })
667 .unwrap();
668 }
669 self.senders.insert(handle, send);
670 Connecting::new(handle, conn, self.sender.clone(), recv, sender, runtime)
671 }
672
673 fn is_empty(&self) -> bool {
674 self.senders.is_empty()
675 }
676}
677
678pub(crate) fn ensure_ipv6(x: SocketAddr) -> SocketAddrV6 {
679 match x {
680 SocketAddr::V6(x) => x,
681 SocketAddr::V4(x) => SocketAddrV6::new(x.ip().to_ipv6_mapped(), x.port(), 0, 0),
682 }
683}
684
685pin_project! {
686 pub struct Accept<'a> {
688 endpoint: &'a Endpoint,
689 #[pin]
690 notify: Notified<'a>,
691 }
692}
693
694impl Future for Accept<'_> {
695 type Output = Option<Incoming>;
696 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
697 let mut this = self.project();
698 let mut endpoint = this.endpoint.inner.state.lock().unwrap();
699 if endpoint.driver_lost {
700 return Poll::Ready(None);
701 }
702 if let Some(incoming) = endpoint.recv_state.incoming.pop_front() {
703 drop(endpoint);
705 let incoming = Incoming::new(incoming, this.endpoint.inner.clone());
706 return Poll::Ready(Some(incoming));
707 }
708 if endpoint.recv_state.connections.close.is_some() {
709 return Poll::Ready(None);
710 }
711 loop {
712 match this.notify.as_mut().poll(ctx) {
713 Poll::Pending => return Poll::Pending,
715 Poll::Ready(()) => this
717 .notify
718 .set(this.endpoint.inner.shared.incoming.notified()),
719 }
720 }
721 }
722}
723
724#[derive(Debug)]
725pub(crate) struct EndpointRef(Arc<EndpointInner>);
726
727impl EndpointRef {
728 pub(crate) fn new(
729 socket: Box<dyn AsyncUdpSocket>,
730 inner: proto::Endpoint,
731 ipv6: bool,
732 runtime: Arc<dyn Runtime>,
733 ) -> Self {
734 let (sender, events) = mpsc::unbounded_channel();
735 let recv_state = RecvState::new(sender, socket.max_receive_segments(), &inner);
736 let sender = socket.create_sender();
737 Self(Arc::new(EndpointInner {
738 shared: Shared {
739 incoming: Notify::new(),
740 idle: Notify::new(),
741 },
742 state: Mutex::new(State {
743 socket,
744 sender,
745 prev_socket: None,
746 inner,
747 ipv6,
748 events,
749 driver: None,
750 ref_count: 0,
751 driver_lost: false,
752 recv_state,
753 runtime,
754 stats: EndpointStats::default(),
755 }),
756 }))
757 }
758}
759
760impl Clone for EndpointRef {
761 fn clone(&self) -> Self {
762 self.0.state.lock().unwrap().ref_count += 1;
763 Self(self.0.clone())
764 }
765}
766
767impl Drop for EndpointRef {
768 fn drop(&mut self) {
769 let endpoint = &mut *self.0.state.lock().unwrap();
770 if let Some(x) = endpoint.ref_count.checked_sub(1) {
771 endpoint.ref_count = x;
772 if x == 0 {
773 if let Some(task) = endpoint.driver.take() {
776 task.wake();
777 }
778 }
779 }
780 }
781}
782
783impl std::ops::Deref for EndpointRef {
784 type Target = EndpointInner;
785 fn deref(&self) -> &Self::Target {
786 &self.0
787 }
788}
789
790struct RecvState {
792 incoming: VecDeque<proto::Incoming>,
793 connections: ConnectionSet,
794 recv_buf: Box<[u8]>,
795 recv_limiter: WorkLimiter,
796}
797
798impl RecvState {
799 fn new(
800 sender: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
801 max_receive_segments: usize,
802 endpoint: &proto::Endpoint,
803 ) -> Self {
804 let recv_buf = vec![
805 0;
806 endpoint.config().get_max_udp_payload_size().min(64 * 1024) as usize
807 * max_receive_segments
808 * BATCH_SIZE
809 ];
810 Self {
811 connections: ConnectionSet {
812 senders: FxHashMap::default(),
813 sender,
814 close: None,
815 },
816 incoming: VecDeque::new(),
817 recv_buf: recv_buf.into(),
818 recv_limiter: WorkLimiter::new(RECV_TIME_BOUND),
819 }
820 }
821
822 fn poll_socket(
823 &mut self,
824 cx: &mut Context,
825 endpoint: &mut proto::Endpoint,
826 socket: &mut dyn AsyncUdpSocket,
827 sender: &mut Pin<Box<dyn UdpSender>>,
828 runtime: &dyn Runtime,
829 now: Instant,
830 ) -> Result<PollProgress, io::Error> {
831 let mut received_connection_packet = false;
832 let mut metas = [RecvMeta::default(); BATCH_SIZE];
833 let mut iovs: [IoSliceMut; BATCH_SIZE] = {
834 let mut bufs = self
835 .recv_buf
836 .chunks_mut(self.recv_buf.len() / BATCH_SIZE)
837 .map(IoSliceMut::new);
838
839 std::array::from_fn(|_| bufs.next().expect("BATCH_SIZE elements"))
843 };
844 loop {
845 match socket.poll_recv(cx, &mut iovs, &mut metas) {
846 Poll::Ready(Ok(msgs)) => {
847 self.recv_limiter.record_work(msgs);
848 for (meta, buf) in metas.iter().zip(iovs.iter()).take(msgs) {
849 let mut data: BytesMut = buf[0..meta.len].into();
850 while !data.is_empty() {
851 let buf = data.split_to(meta.stride.min(data.len()));
852 let mut response_buffer = Vec::new();
853 match endpoint.handle(
854 now,
855 meta.addr,
856 meta.dst_ip,
857 meta.ecn.map(proto_ecn),
858 buf,
859 &mut response_buffer,
860 ) {
861 Some(DatagramEvent::NewConnection(incoming)) => {
862 if self.connections.close.is_none() {
863 self.incoming.push_back(incoming);
864 } else {
865 let transmit =
866 endpoint.refuse(incoming, &mut response_buffer);
867 respond(transmit, &response_buffer, sender);
868 }
869 }
870 Some(DatagramEvent::ConnectionEvent(handle, event)) => {
871 received_connection_packet = true;
873 let _ = self
874 .connections
875 .senders
876 .get_mut(&handle)
877 .unwrap()
878 .send(ConnectionEvent::Proto(event));
879 }
880 Some(DatagramEvent::Response(transmit)) => {
881 respond(transmit, &response_buffer, sender);
882 }
883 None => {}
884 }
885 }
886 }
887 }
888 Poll::Pending => {
889 return Ok(PollProgress {
890 received_connection_packet,
891 keep_going: false,
892 });
893 }
894 Poll::Ready(Err(ref e)) if e.kind() == io::ErrorKind::ConnectionReset => {
897 continue;
898 }
899 Poll::Ready(Err(e)) => {
900 return Err(e);
901 }
902 }
903 if !self.recv_limiter.allow_work(|| runtime.now()) {
904 return Ok(PollProgress {
905 received_connection_packet,
906 keep_going: true,
907 });
908 }
909 }
910 }
911}
912
913impl fmt::Debug for RecvState {
914 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
915 f.debug_struct("RecvState")
916 .field("incoming", &self.incoming)
917 .field("connections", &self.connections)
918 .field("recv_limiter", &self.recv_limiter)
920 .finish_non_exhaustive()
921 }
922}
923
924#[derive(Default)]
925struct PollProgress {
926 received_connection_packet: bool,
928 keep_going: bool,
930}