iroh_quinn/runtime/
mod.rs

1#[cfg(any(feature = "runtime-tokio", feature = "runtime-smol"))]
2use std::sync::Arc;
3use std::{
4    fmt::{self, Debug},
5    future::Future,
6    io::{self, IoSliceMut},
7    net::SocketAddr,
8    num::NonZeroUsize,
9    pin::Pin,
10    task::{Context, Poll},
11};
12
13use udp::{RecvMeta, Transmit};
14
15use crate::Instant;
16
17/// Abstracts I/O and timer operations for runtime independence
18pub trait Runtime: Send + Sync + Debug + 'static {
19    /// Construct a timer that will expire at `i`
20    fn new_timer(&self, i: Instant) -> Pin<Box<dyn AsyncTimer>>;
21    /// Drive `future` to completion in the background
22    #[track_caller]
23    fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
24    /// Convert `t` into the socket type used by this runtime
25    #[cfg(not(wasm_browser))]
26    fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result<Box<dyn AsyncUdpSocket>>;
27    /// Look up the current time
28    ///
29    /// Allows simulating the flow of time for testing.
30    fn now(&self) -> Instant {
31        Instant::now()
32    }
33}
34
35/// Abstract implementation of an async timer for runtime independence
36pub trait AsyncTimer: Send + Debug + 'static {
37    /// Update the timer to expire at `i`
38    fn reset(self: Pin<&mut Self>, i: Instant);
39    /// Check whether the timer has expired, and register to be woken if not
40    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>;
41}
42
43/// Abstract implementation of a UDP socket for runtime independence
44pub trait AsyncUdpSocket: Send + Sync + Debug + 'static {
45    /// Create a [`UdpSender`] that can register a single task for write-readiness notifications
46    /// and send a transmit, if ready.
47    ///
48    /// A `poll_send` method on a single object can usually store only one [`Waker`] at a time,
49    /// i.e. allow at most one caller to wait for an event. This method allows any number of
50    /// interested tasks to construct their own [`UdpSender`] object. They can all then wait for the
51    /// same event and be notified concurrently, because each [`UdpSender`] can store a separate
52    /// [`Waker`].
53    ///
54    /// [`Waker`]: std::task::Waker
55    fn create_sender(&self) -> Pin<Box<dyn UdpSender>>;
56
57    /// Receive UDP datagrams, or register to be woken if receiving may succeed in the future
58    fn poll_recv(
59        &mut self,
60        cx: &mut Context,
61        bufs: &mut [IoSliceMut<'_>],
62        meta: &mut [RecvMeta],
63    ) -> Poll<io::Result<usize>>;
64
65    /// Look up the local IP address and port used by this socket
66    fn local_addr(&self) -> io::Result<SocketAddr>;
67
68    /// Maximum number of datagrams that might be described by a single [`RecvMeta`]
69    fn max_receive_segments(&self) -> NonZeroUsize {
70        NonZeroUsize::MIN
71    }
72
73    /// Whether datagrams might get fragmented into multiple parts
74    ///
75    /// Sockets should prevent this for best performance. See e.g. the `IPV6_DONTFRAG` socket
76    /// option.
77    fn may_fragment(&self) -> bool {
78        true
79    }
80}
81
82/// An object for asynchronously writing to an associated [`AsyncUdpSocket`].
83///
84/// Any number of [`UdpSender`]s may exist for a single [`AsyncUdpSocket`]. Each [`UdpSender`] is
85/// responsible for notifying at most one task for send readiness.
86pub trait UdpSender: Send + Sync + Debug + 'static {
87    /// Send a UDP datagram, or register to be woken if sending may succeed in the future.
88    ///
89    /// Usually implementations of this will poll the socket for writability before trying to
90    /// write to them, and retry both if writing fails.
91    ///
92    /// Quinn will create multiple [`UdpSender`]s, one for each task it's using it from. Thus it's
93    /// important to poll the underlying socket in a way that doesn't overwrite wakers.
94    ///
95    /// A single [`UdpSender`] will be reused, even if `poll_send` returns `Poll::Ready` once,
96    /// unlike [`Future::poll`], so calling it again after readiness should not panic.
97    fn poll_send(
98        self: Pin<&mut Self>,
99        transmit: &Transmit,
100        cx: &mut Context,
101    ) -> Poll<io::Result<()>>;
102
103    /// Maximum number of datagrams that a [`Transmit`] may encode.
104    fn max_transmit_segments(&self) -> NonZeroUsize {
105        NonZeroUsize::MIN
106    }
107}
108
109pin_project_lite::pin_project! {
110    /// A helper for constructing [`UdpSender`]s from an underlying `Socket` type.
111    ///
112    /// This struct implements [`UdpSender`] if `MakeWritableFn` produces a `WritableFut`.
113    ///
114    /// Also serves as a trick, since `WritableFut` doesn't need to be a named future,
115    /// it can be an anonymous async block, as long as `MakeWritableFn` produces that
116    /// anonymous async block type.
117    ///
118    /// The `UdpSenderHelper` generic type parameters don't need to named, as it will be
119    /// used in its dyn-compatible form as a `Pin<Box<dyn UdpSender>>`.
120    struct UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut> {
121        socket: Socket,
122        make_writable_fut_fn: MakeWritableFutFn,
123        #[pin]
124        writable_fut: Option<WritableFut>,
125    }
126}
127
128impl<Socket, MakeWritableFutFn, WritableFut> Debug
129    for UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut>
130{
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        f.write_str("UdpSender")
133    }
134}
135
136impl<Socket, MakeWritableFutFn, WriteableFut>
137    UdpSenderHelper<Socket, MakeWritableFutFn, WriteableFut>
138{
139    /// Create helper that implements [`UdpSender`] from a socket.
140    ///
141    /// Additionally you need to provide what is essentially an async function
142    /// that resolves once the socket is write-ready.
143    ///
144    /// See also the bounds on this struct's [`UdpSender`] implementation.
145    #[cfg(any(feature = "runtime-smol", feature = "runtime-tokio",))]
146    fn new(inner: Socket, make_fut: MakeWritableFutFn) -> Self {
147        Self {
148            socket: inner,
149            make_writable_fut_fn: make_fut,
150            writable_fut: None,
151        }
152    }
153}
154
155impl<Socket, MakeWritableFutFn, WritableFut> super::UdpSender
156    for UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut>
157where
158    Socket: UdpSenderHelperSocket,
159    MakeWritableFutFn: Fn(&Socket) -> WritableFut + Send + Sync + 'static,
160    WritableFut: Future<Output = io::Result<()>> + Send + Sync + 'static,
161{
162    fn poll_send(
163        self: Pin<&mut Self>,
164        transmit: &udp::Transmit,
165        cx: &mut Context,
166    ) -> Poll<io::Result<()>> {
167        let mut this = self.project();
168        loop {
169            if this.writable_fut.is_none() {
170                this.writable_fut
171                    .set(Some((this.make_writable_fut_fn)(this.socket)));
172            }
173            // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely
174            // obtain an `&mut WritableFut` after storing it in `self.writable_fut` when `self` is already behind `Pin`,
175            // and if we didn't store it then we wouldn't be able to keep it alive between
176            // `poll_send` calls.
177            let result =
178                std::task::ready!(this.writable_fut.as_mut().as_pin_mut().unwrap().poll(cx));
179
180            // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for
181            // a new `Future` to be created on the next call.
182            this.writable_fut.set(None);
183
184            // If .writable() fails, propagate the error
185            result?;
186
187            match this.socket.try_send(transmit) {
188                // We thought the socket was writable, but it wasn't, then retry so that either another
189                // `writable().await` call determines that the socket is indeed not writable and
190                // registers us for a wakeup, or the send succeeds if this really was just a
191                // transient failure.
192                Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
193                // In all other cases, either propagate the error or we're Ok
194                result => return Poll::Ready(result),
195            }
196        }
197    }
198
199    fn max_transmit_segments(&self) -> NonZeroUsize {
200        self.socket.max_transmit_segments()
201    }
202}
203
204/// Parts of the [`UdpSender`] trait that aren't asynchronous or require storing wakers.
205///
206/// This trait is used by [`UdpSenderHelper`] to help construct [`UdpSender`]s.
207trait UdpSenderHelperSocket: Send + Sync + 'static {
208    /// Try to send a transmit, if the socket happens to be write-ready.
209    ///
210    /// If not write-ready, this is allowed to return [`std::io::ErrorKind::WouldBlock`].
211    ///
212    /// The [`UdpSenderHelper`] will use this to implement [`UdpSender::poll_send`].
213    fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()>;
214
215    /// See [`UdpSender::max_transmit_segments`].
216    fn max_transmit_segments(&self) -> NonZeroUsize;
217}
218
219/// Automatically select an appropriate runtime from those enabled at compile time
220///
221/// If `runtime-tokio` is enabled and this function is called from within a Tokio runtime context,
222/// then `TokioRuntime` is returned. Otherwise, if `runtime-smol` is enabled, `SmolRuntime` is
223/// returned. Otherwise, `None` is returned.
224#[cfg(any(feature = "runtime-tokio", feature = "runtime-smol"))]
225#[allow(clippy::needless_return)] // Be sure we return the right thing
226pub fn default_runtime() -> Option<Arc<dyn Runtime>> {
227    #[cfg(feature = "runtime-tokio")]
228    {
229        if ::tokio::runtime::Handle::try_current().is_ok() {
230            return Some(Arc::new(TokioRuntime));
231        }
232    }
233
234    #[cfg(feature = "runtime-smol")]
235    {
236        return Some(Arc::new(SmolRuntime));
237    }
238
239    #[cfg(not(feature = "runtime-smol"))]
240    None
241}
242
243#[cfg(feature = "runtime-tokio")]
244mod tokio;
245#[cfg(feature = "runtime-tokio")]
246pub use tokio::TokioRuntime;
247
248#[cfg(feature = "runtime-smol")]
249mod smol;
250#[cfg(feature = "runtime-smol")]
251pub use smol::*;