iroh_quinn/runtime/
tokio.rs

1use std::{
2    fmt::Debug,
3    future::Future,
4    io,
5    num::NonZeroUsize,
6    pin::Pin,
7    sync::Arc,
8    task::{Context, Poll, ready},
9    time::Instant,
10};
11
12use tokio::{
13    io::Interest,
14    time::{Sleep, sleep_until},
15};
16
17use super::{AsyncTimer, AsyncUdpSocket, Runtime, UdpSenderHelper, UdpSenderHelperSocket};
18
19/// A Quinn runtime for Tokio
20#[derive(Debug)]
21pub struct TokioRuntime;
22
23impl Runtime for TokioRuntime {
24    fn new_timer(&self, t: Instant) -> Pin<Box<dyn AsyncTimer>> {
25        Box::pin(sleep_until(t.into()))
26    }
27
28    fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
29        tokio::spawn(future);
30    }
31
32    fn wrap_udp_socket(&self, sock: std::net::UdpSocket) -> io::Result<Box<dyn AsyncUdpSocket>> {
33        Ok(Box::new(UdpSocket {
34            inner: Arc::new(udp::UdpSocketState::new((&sock).into())?),
35            io: Arc::new(tokio::net::UdpSocket::from_std(sock)?),
36        }))
37    }
38
39    fn now(&self) -> Instant {
40        tokio::time::Instant::now().into_std()
41    }
42}
43
44impl AsyncTimer for Sleep {
45    fn reset(self: Pin<&mut Self>, t: Instant) {
46        Self::reset(self, t.into())
47    }
48    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
49        Future::poll(self, cx)
50    }
51}
52
53#[derive(Debug, Clone)]
54struct UdpSocket {
55    io: Arc<tokio::net::UdpSocket>,
56    inner: Arc<udp::UdpSocketState>,
57}
58
59impl UdpSenderHelperSocket for UdpSocket {
60    fn max_transmit_segments(&self) -> NonZeroUsize {
61        self.inner.max_gso_segments()
62    }
63
64    fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()> {
65        self.io.try_io(Interest::WRITABLE, || {
66            self.inner.send((&self.io).into(), transmit)
67        })
68    }
69}
70
71impl AsyncUdpSocket for UdpSocket {
72    fn create_sender(&self) -> Pin<Box<dyn super::UdpSender>> {
73        Box::pin(UdpSenderHelper::new(self.clone(), |socket: &Self| {
74            let socket = socket.clone();
75            async move { socket.io.writable().await }
76        }))
77    }
78
79    fn poll_recv(
80        &mut self,
81        cx: &mut Context,
82        bufs: &mut [std::io::IoSliceMut<'_>],
83        meta: &mut [udp::RecvMeta],
84    ) -> Poll<io::Result<usize>> {
85        loop {
86            ready!(self.io.poll_recv_ready(cx))?;
87            if let Ok(res) = self.io.try_io(Interest::READABLE, || {
88                self.inner.recv((&self.io).into(), bufs, meta)
89            }) {
90                return Poll::Ready(Ok(res));
91            }
92        }
93    }
94
95    fn local_addr(&self) -> io::Result<std::net::SocketAddr> {
96        self.io.local_addr()
97    }
98
99    fn may_fragment(&self) -> bool {
100        self.inner.may_fragment()
101    }
102
103    fn max_receive_segments(&self) -> NonZeroUsize {
104        self.inner.gro_segments()
105    }
106}