iroh_quinn/runtime/
mod.rs

1#[cfg(any(feature = "runtime-tokio", feature = "runtime-smol"))]
2use std::sync::Arc;
3use std::{
4    fmt::{self, Debug},
5    future::Future,
6    io::{self, IoSliceMut},
7    net::SocketAddr,
8    pin::Pin,
9    task::{Context, Poll},
10};
11
12use udp::{RecvMeta, Transmit};
13
14use crate::Instant;
15
16/// Abstracts I/O and timer operations for runtime independence
17pub trait Runtime: Send + Sync + Debug + 'static {
18    /// Construct a timer that will expire at `i`
19    fn new_timer(&self, i: Instant) -> Pin<Box<dyn AsyncTimer>>;
20    /// Drive `future` to completion in the background
21    #[track_caller]
22    fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
23    /// Convert `t` into the socket type used by this runtime
24    #[cfg(not(wasm_browser))]
25    fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result<Box<dyn AsyncUdpSocket>>;
26    /// Look up the current time
27    ///
28    /// Allows simulating the flow of time for testing.
29    fn now(&self) -> Instant {
30        Instant::now()
31    }
32}
33
34/// Abstract implementation of an async timer for runtime independence
35pub trait AsyncTimer: Send + Debug + 'static {
36    /// Update the timer to expire at `i`
37    fn reset(self: Pin<&mut Self>, i: Instant);
38    /// Check whether the timer has expired, and register to be woken if not
39    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>;
40}
41
42/// Abstract implementation of a UDP socket for runtime independence
43pub trait AsyncUdpSocket: Send + Sync + Debug + 'static {
44    /// Create a [`UdpSender`] that can register a single task for write-readiness notifications
45    /// and send a transmit, if ready.
46    ///
47    /// A `poll_send` method on a single object can usually store only one [`Waker`] at a time,
48    /// i.e. allow at most one caller to wait for an event. This method allows any number of
49    /// interested tasks to construct their own [`UdpSender`] object. They can all then wait for the
50    /// same event and be notified concurrently, because each [`UdpSender`] can store a separate
51    /// [`Waker`].
52    ///
53    /// [`Waker`]: std::task::Waker
54    fn create_sender(&self) -> Pin<Box<dyn UdpSender>>;
55
56    /// Receive UDP datagrams, or register to be woken if receiving may succeed in the future
57    fn poll_recv(
58        &mut self,
59        cx: &mut Context,
60        bufs: &mut [IoSliceMut<'_>],
61        meta: &mut [RecvMeta],
62    ) -> Poll<io::Result<usize>>;
63
64    /// Look up the local IP address and port used by this socket
65    fn local_addr(&self) -> io::Result<SocketAddr>;
66
67    /// Maximum number of datagrams that might be described by a single [`RecvMeta`]
68    fn max_receive_segments(&self) -> usize {
69        1
70    }
71
72    /// Whether datagrams might get fragmented into multiple parts
73    ///
74    /// Sockets should prevent this for best performance. See e.g. the `IPV6_DONTFRAG` socket
75    /// option.
76    fn may_fragment(&self) -> bool {
77        true
78    }
79}
80
81/// An object for asynchronously writing to an associated [`AsyncUdpSocket`].
82///
83/// Any number of [`UdpSender`]s may exist for a single [`AsyncUdpSocket`]. Each [`UdpSender`] is
84/// responsible for notifying at most one task for send readiness.
85pub trait UdpSender: Send + Sync + Debug + 'static {
86    /// Send a UDP datagram, or register to be woken if sending may succeed in the future.
87    ///
88    /// Usually implementations of this will poll the socket for writability before trying to
89    /// write to them, and retry both if writing fails.
90    ///
91    /// Quinn will create multiple [`UdpSender`]s, one for each task it's using it from. Thus it's
92    /// important to poll the underlying socket in a way that doesn't overwrite wakers.
93    ///
94    /// A single [`UdpSender`] will be reused, even if `poll_send` returns `Poll::Ready` once,
95    /// unlike [`Future::poll`], so calling it again after readiness should not panic.
96    fn poll_send(
97        self: Pin<&mut Self>,
98        transmit: &Transmit,
99        cx: &mut Context,
100    ) -> Poll<io::Result<()>>;
101
102    /// Maximum number of datagrams that a [`Transmit`] may encode.
103    fn max_transmit_segments(&self) -> usize {
104        1
105    }
106}
107
108pin_project_lite::pin_project! {
109    /// A helper for constructing [`UdpSender`]s from an underlying `Socket` type.
110    ///
111    /// This struct implements [`UdpSender`] if `MakeWritableFn` produces a `WritableFut`.
112    ///
113    /// Also serves as a trick, since `WritableFut` doesn't need to be a named future,
114    /// it can be an anonymous async block, as long as `MakeWritableFn` produces that
115    /// anonymous async block type.
116    ///
117    /// The `UdpSenderHelper` generic type parameters don't need to named, as it will be
118    /// used in its dyn-compatible form as a `Pin<Box<dyn UdpSender>>`.
119    struct UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut> {
120        socket: Socket,
121        make_writable_fut_fn: MakeWritableFutFn,
122        #[pin]
123        writable_fut: Option<WritableFut>,
124    }
125}
126
127impl<Socket, MakeWritableFutFn, WritableFut> Debug
128    for UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut>
129{
130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131        f.write_str("UdpSender")
132    }
133}
134
135impl<Socket, MakeWritableFutFn, WriteableFut>
136    UdpSenderHelper<Socket, MakeWritableFutFn, WriteableFut>
137{
138    /// Create helper that implements [`UdpSender`] from a socket.
139    ///
140    /// Additionally you need to provide what is essentially an async function
141    /// that resolves once the socket is write-ready.
142    ///
143    /// See also the bounds on this struct's [`UdpSender`] implementation.
144    #[cfg(any(feature = "runtime-smol", feature = "runtime-tokio",))]
145    fn new(inner: Socket, make_fut: MakeWritableFutFn) -> Self {
146        Self {
147            socket: inner,
148            make_writable_fut_fn: make_fut,
149            writable_fut: None,
150        }
151    }
152}
153
154impl<Socket, MakeWritableFutFn, WritableFut> super::UdpSender
155    for UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut>
156where
157    Socket: UdpSenderHelperSocket,
158    MakeWritableFutFn: Fn(&Socket) -> WritableFut + Send + Sync + 'static,
159    WritableFut: Future<Output = io::Result<()>> + Send + Sync + 'static,
160{
161    fn poll_send(
162        self: Pin<&mut Self>,
163        transmit: &udp::Transmit,
164        cx: &mut Context,
165    ) -> Poll<io::Result<()>> {
166        let mut this = self.project();
167        loop {
168            if this.writable_fut.is_none() {
169                this.writable_fut
170                    .set(Some((this.make_writable_fut_fn)(this.socket)));
171            }
172            // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely
173            // obtain an `&mut WritableFut` after storing it in `self.writable_fut` when `self` is already behind `Pin`,
174            // and if we didn't store it then we wouldn't be able to keep it alive between
175            // `poll_send` calls.
176            let result =
177                std::task::ready!(this.writable_fut.as_mut().as_pin_mut().unwrap().poll(cx));
178
179            // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for
180            // a new `Future` to be created on the next call.
181            this.writable_fut.set(None);
182
183            // If .writable() fails, propagate the error
184            result?;
185
186            match this.socket.try_send(transmit) {
187                // We thought the socket was writable, but it wasn't, then retry so that either another
188                // `writable().await` call determines that the socket is indeed not writable and
189                // registers us for a wakeup, or the send succeeds if this really was just a
190                // transient failure.
191                Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
192                // In all other cases, either propagate the error or we're Ok
193                result => return Poll::Ready(result),
194            }
195        }
196    }
197
198    fn max_transmit_segments(&self) -> usize {
199        self.socket.max_transmit_segments()
200    }
201}
202
203/// Parts of the [`UdpSender`] trait that aren't asynchronous or require storing wakers.
204///
205/// This trait is used by [`UdpSenderHelper`] to help construct [`UdpSender`]s.
206trait UdpSenderHelperSocket: Send + Sync + 'static {
207    /// Try to send a transmit, if the socket happens to be write-ready.
208    ///
209    /// If not write-ready, this is allowed to return [`std::io::ErrorKind::WouldBlock`].
210    ///
211    /// The [`UdpSenderHelper`] will use this to implement [`UdpSender::poll_send`].
212    fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()>;
213
214    /// See [`UdpSender::max_transmit_segments`].
215    fn max_transmit_segments(&self) -> usize;
216}
217
218/// Automatically select an appropriate runtime from those enabled at compile time
219///
220/// If `runtime-tokio` is enabled and this function is called from within a Tokio runtime context,
221/// then `TokioRuntime` is returned. Otherwise, if `runtime-smol` is enabled, `SmolRuntime` is
222/// returned. Otherwise, `None` is returned.
223#[cfg(any(feature = "runtime-tokio", feature = "runtime-smol"))]
224#[allow(clippy::needless_return)] // Be sure we return the right thing
225pub fn default_runtime() -> Option<Arc<dyn Runtime>> {
226    #[cfg(feature = "runtime-tokio")]
227    {
228        if ::tokio::runtime::Handle::try_current().is_ok() {
229            return Some(Arc::new(TokioRuntime));
230        }
231    }
232
233    #[cfg(feature = "runtime-smol")]
234    {
235        return Some(Arc::new(SmolRuntime));
236    }
237
238    #[cfg(not(feature = "runtime-smol"))]
239    None
240}
241
242#[cfg(feature = "runtime-tokio")]
243mod tokio;
244#[cfg(feature = "runtime-tokio")]
245pub use tokio::TokioRuntime;
246
247#[cfg(feature = "runtime-smol")]
248mod smol;
249#[cfg(feature = "runtime-smol")]
250pub use smol::*;