iroh_quinn/runtime/mod.rs
1#[cfg(any(feature = "runtime-tokio", feature = "runtime-smol"))]
2use std::sync::Arc;
3use std::{
4 fmt::{self, Debug},
5 future::Future,
6 io::{self, IoSliceMut},
7 net::SocketAddr,
8 num::NonZeroUsize,
9 pin::Pin,
10 task::{Context, Poll},
11};
12
13use udp::{RecvMeta, Transmit};
14
15use crate::Instant;
16
17/// Abstracts I/O and timer operations for runtime independence
18pub trait Runtime: Send + Sync + Debug + 'static {
19 /// Construct a timer that will expire at `i`
20 fn new_timer(&self, i: Instant) -> Pin<Box<dyn AsyncTimer>>;
21 /// Drive `future` to completion in the background
22 #[track_caller]
23 fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
24 /// Convert `t` into the socket type used by this runtime
25 #[cfg(not(wasm_browser))]
26 fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result<Box<dyn AsyncUdpSocket>>;
27 /// Look up the current time
28 ///
29 /// Allows simulating the flow of time for testing.
30 fn now(&self) -> Instant {
31 Instant::now()
32 }
33}
34
35/// Abstract implementation of an async timer for runtime independence
36pub trait AsyncTimer: Send + Debug + 'static {
37 /// Update the timer to expire at `i`
38 fn reset(self: Pin<&mut Self>, i: Instant);
39 /// Check whether the timer has expired, and register to be woken if not
40 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>;
41}
42
43/// Abstract implementation of a UDP socket for runtime independence
44pub trait AsyncUdpSocket: Send + Sync + Debug + 'static {
45 /// Create a [`UdpSender`] that can register a single task for write-readiness notifications
46 /// and send a transmit, if ready.
47 ///
48 /// A `poll_send` method on a single object can usually store only one [`Waker`] at a time,
49 /// i.e. allow at most one caller to wait for an event. This method allows any number of
50 /// interested tasks to construct their own [`UdpSender`] object. They can all then wait for the
51 /// same event and be notified concurrently, because each [`UdpSender`] can store a separate
52 /// [`Waker`].
53 ///
54 /// [`Waker`]: std::task::Waker
55 fn create_sender(&self) -> Pin<Box<dyn UdpSender>>;
56
57 /// Receive UDP datagrams, or register to be woken if receiving may succeed in the future
58 fn poll_recv(
59 &mut self,
60 cx: &mut Context,
61 bufs: &mut [IoSliceMut<'_>],
62 meta: &mut [RecvMeta],
63 ) -> Poll<io::Result<usize>>;
64
65 /// Look up the local IP address and port used by this socket
66 fn local_addr(&self) -> io::Result<SocketAddr>;
67
68 /// Maximum number of datagrams that might be described by a single [`RecvMeta`]
69 fn max_receive_segments(&self) -> NonZeroUsize {
70 NonZeroUsize::MIN
71 }
72
73 /// Whether datagrams might get fragmented into multiple parts
74 ///
75 /// Sockets should prevent this for best performance. See e.g. the `IPV6_DONTFRAG` socket
76 /// option.
77 fn may_fragment(&self) -> bool {
78 true
79 }
80}
81
82/// An object for asynchronously writing to an associated [`AsyncUdpSocket`].
83///
84/// Any number of [`UdpSender`]s may exist for a single [`AsyncUdpSocket`]. Each [`UdpSender`] is
85/// responsible for notifying at most one task for send readiness.
86pub trait UdpSender: Send + Sync + Debug + 'static {
87 /// Send a UDP datagram, or register to be woken if sending may succeed in the future.
88 ///
89 /// Usually implementations of this will poll the socket for writability before trying to
90 /// write to them, and retry both if writing fails.
91 ///
92 /// Quinn will create multiple [`UdpSender`]s, one for each task it's using it from. Thus it's
93 /// important to poll the underlying socket in a way that doesn't overwrite wakers.
94 ///
95 /// A single [`UdpSender`] will be reused, even if `poll_send` returns `Poll::Ready` once,
96 /// unlike [`Future::poll`], so calling it again after readiness should not panic.
97 fn poll_send(
98 self: Pin<&mut Self>,
99 transmit: &Transmit,
100 cx: &mut Context,
101 ) -> Poll<io::Result<()>>;
102
103 /// Maximum number of datagrams that a [`Transmit`] may encode.
104 fn max_transmit_segments(&self) -> NonZeroUsize {
105 NonZeroUsize::MIN
106 }
107}
108
109pin_project_lite::pin_project! {
110 /// A helper for constructing [`UdpSender`]s from an underlying `Socket` type.
111 ///
112 /// This struct implements [`UdpSender`] if `MakeWritableFn` produces a `WritableFut`.
113 ///
114 /// Also serves as a trick, since `WritableFut` doesn't need to be a named future,
115 /// it can be an anonymous async block, as long as `MakeWritableFn` produces that
116 /// anonymous async block type.
117 ///
118 /// The `UdpSenderHelper` generic type parameters don't need to named, as it will be
119 /// used in its dyn-compatible form as a `Pin<Box<dyn UdpSender>>`.
120 struct UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut> {
121 socket: Socket,
122 make_writable_fut_fn: MakeWritableFutFn,
123 #[pin]
124 writable_fut: Option<WritableFut>,
125 }
126}
127
128impl<Socket, MakeWritableFutFn, WritableFut> Debug
129 for UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut>
130{
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 f.write_str("UdpSender")
133 }
134}
135
136impl<Socket, MakeWritableFutFn, WriteableFut>
137 UdpSenderHelper<Socket, MakeWritableFutFn, WriteableFut>
138{
139 /// Create helper that implements [`UdpSender`] from a socket.
140 ///
141 /// Additionally you need to provide what is essentially an async function
142 /// that resolves once the socket is write-ready.
143 ///
144 /// See also the bounds on this struct's [`UdpSender`] implementation.
145 #[cfg(any(feature = "runtime-smol", feature = "runtime-tokio",))]
146 fn new(inner: Socket, make_fut: MakeWritableFutFn) -> Self {
147 Self {
148 socket: inner,
149 make_writable_fut_fn: make_fut,
150 writable_fut: None,
151 }
152 }
153}
154
155impl<Socket, MakeWritableFutFn, WritableFut> super::UdpSender
156 for UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut>
157where
158 Socket: UdpSenderHelperSocket,
159 MakeWritableFutFn: Fn(&Socket) -> WritableFut + Send + Sync + 'static,
160 WritableFut: Future<Output = io::Result<()>> + Send + Sync + 'static,
161{
162 fn poll_send(
163 self: Pin<&mut Self>,
164 transmit: &udp::Transmit,
165 cx: &mut Context,
166 ) -> Poll<io::Result<()>> {
167 let mut this = self.project();
168 loop {
169 if this.writable_fut.is_none() {
170 this.writable_fut
171 .set(Some((this.make_writable_fut_fn)(this.socket)));
172 }
173 // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely
174 // obtain an `&mut WritableFut` after storing it in `self.writable_fut` when `self` is already behind `Pin`,
175 // and if we didn't store it then we wouldn't be able to keep it alive between
176 // `poll_send` calls.
177 let result =
178 std::task::ready!(this.writable_fut.as_mut().as_pin_mut().unwrap().poll(cx));
179
180 // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for
181 // a new `Future` to be created on the next call.
182 this.writable_fut.set(None);
183
184 // If .writable() fails, propagate the error
185 result?;
186
187 match this.socket.try_send(transmit) {
188 // We thought the socket was writable, but it wasn't, then retry so that either another
189 // `writable().await` call determines that the socket is indeed not writable and
190 // registers us for a wakeup, or the send succeeds if this really was just a
191 // transient failure.
192 Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
193 // In all other cases, either propagate the error or we're Ok
194 result => return Poll::Ready(result),
195 }
196 }
197 }
198
199 fn max_transmit_segments(&self) -> NonZeroUsize {
200 self.socket.max_transmit_segments()
201 }
202}
203
204/// Parts of the [`UdpSender`] trait that aren't asynchronous or require storing wakers.
205///
206/// This trait is used by [`UdpSenderHelper`] to help construct [`UdpSender`]s.
207trait UdpSenderHelperSocket: Send + Sync + 'static {
208 /// Try to send a transmit, if the socket happens to be write-ready.
209 ///
210 /// If not write-ready, this is allowed to return [`std::io::ErrorKind::WouldBlock`].
211 ///
212 /// The [`UdpSenderHelper`] will use this to implement [`UdpSender::poll_send`].
213 fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()>;
214
215 /// See [`UdpSender::max_transmit_segments`].
216 fn max_transmit_segments(&self) -> NonZeroUsize;
217}
218
219/// Automatically select an appropriate runtime from those enabled at compile time
220///
221/// If `runtime-tokio` is enabled and this function is called from within a Tokio runtime context,
222/// then `TokioRuntime` is returned. Otherwise, if `runtime-smol` is enabled, `SmolRuntime` is
223/// returned. Otherwise, `None` is returned.
224#[cfg(any(feature = "runtime-tokio", feature = "runtime-smol"))]
225#[allow(clippy::needless_return)] // Be sure we return the right thing
226pub fn default_runtime() -> Option<Arc<dyn Runtime>> {
227 #[cfg(feature = "runtime-tokio")]
228 {
229 if ::tokio::runtime::Handle::try_current().is_ok() {
230 return Some(Arc::new(TokioRuntime));
231 }
232 }
233
234 #[cfg(feature = "runtime-smol")]
235 {
236 return Some(Arc::new(SmolRuntime));
237 }
238
239 #[cfg(not(feature = "runtime-smol"))]
240 None
241}
242
243#[cfg(feature = "runtime-tokio")]
244mod tokio;
245#[cfg(feature = "runtime-tokio")]
246pub use tokio::TokioRuntime;
247
248#[cfg(feature = "runtime-smol")]
249mod smol;
250#[cfg(feature = "runtime-smol")]
251pub use smol::*;