iroh_quinn/runtime/
tokio.rs1use std::{
2 fmt::Debug,
3 future::Future,
4 io,
5 num::NonZeroUsize,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll, ready},
9 time::Instant,
10};
11
12use tokio::{
13 io::Interest,
14 time::{Sleep, sleep_until},
15};
16
17use super::{AsyncTimer, AsyncUdpSocket, Runtime, UdpSenderHelper, UdpSenderHelperSocket};
18
19#[derive(Debug)]
21pub struct TokioRuntime;
22
23impl Runtime for TokioRuntime {
24 fn new_timer(&self, t: Instant) -> Pin<Box<dyn AsyncTimer>> {
25 Box::pin(sleep_until(t.into()))
26 }
27
28 fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
29 tokio::spawn(future);
30 }
31
32 fn wrap_udp_socket(&self, sock: std::net::UdpSocket) -> io::Result<Box<dyn AsyncUdpSocket>> {
33 Ok(Box::new(UdpSocket {
34 inner: Arc::new(udp::UdpSocketState::new((&sock).into())?),
35 io: Arc::new(tokio::net::UdpSocket::from_std(sock)?),
36 }))
37 }
38
39 fn now(&self) -> Instant {
40 tokio::time::Instant::now().into_std()
41 }
42}
43
44impl AsyncTimer for Sleep {
45 fn reset(self: Pin<&mut Self>, t: Instant) {
46 Self::reset(self, t.into())
47 }
48 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
49 Future::poll(self, cx)
50 }
51}
52
53#[derive(Debug, Clone)]
54struct UdpSocket {
55 io: Arc<tokio::net::UdpSocket>,
56 inner: Arc<udp::UdpSocketState>,
57}
58
59impl UdpSenderHelperSocket for UdpSocket {
60 fn max_transmit_segments(&self) -> NonZeroUsize {
61 self.inner.max_gso_segments()
62 }
63
64 fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()> {
65 self.io.try_io(Interest::WRITABLE, || {
66 self.inner.send((&self.io).into(), transmit)
67 })
68 }
69}
70
71impl AsyncUdpSocket for UdpSocket {
72 fn create_sender(&self) -> Pin<Box<dyn super::UdpSender>> {
73 Box::pin(UdpSenderHelper::new(self.clone(), |socket: &Self| {
74 let socket = socket.clone();
75 async move { socket.io.writable().await }
76 }))
77 }
78
79 fn poll_recv(
80 &mut self,
81 cx: &mut Context,
82 bufs: &mut [std::io::IoSliceMut<'_>],
83 meta: &mut [udp::RecvMeta],
84 ) -> Poll<io::Result<usize>> {
85 loop {
86 ready!(self.io.poll_recv_ready(cx))?;
87 if let Ok(res) = self.io.try_io(Interest::READABLE, || {
88 self.inner.recv((&self.io).into(), bufs, meta)
89 }) {
90 return Poll::Ready(Ok(res));
91 }
92 }
93 }
94
95 fn local_addr(&self) -> io::Result<std::net::SocketAddr> {
96 self.io.local_addr()
97 }
98
99 fn may_fragment(&self) -> bool {
100 self.inner.may_fragment()
101 }
102
103 fn max_receive_segments(&self) -> NonZeroUsize {
104 self.inner.gro_segments()
105 }
106}