iroh_quinn/runtime/mod.rs
1#[cfg(any(feature = "runtime-tokio", feature = "runtime-smol"))]
2use std::sync::Arc;
3use std::{
4 fmt::{self, Debug},
5 future::Future,
6 io::{self, IoSliceMut},
7 net::SocketAddr,
8 pin::Pin,
9 task::{Context, Poll},
10};
11
12use udp::{RecvMeta, Transmit};
13
14use crate::Instant;
15
16/// Abstracts I/O and timer operations for runtime independence
17pub trait Runtime: Send + Sync + Debug + 'static {
18 /// Construct a timer that will expire at `i`
19 fn new_timer(&self, i: Instant) -> Pin<Box<dyn AsyncTimer>>;
20 /// Drive `future` to completion in the background
21 #[track_caller]
22 fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
23 /// Convert `t` into the socket type used by this runtime
24 #[cfg(not(wasm_browser))]
25 fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result<Box<dyn AsyncUdpSocket>>;
26 /// Look up the current time
27 ///
28 /// Allows simulating the flow of time for testing.
29 fn now(&self) -> Instant {
30 Instant::now()
31 }
32}
33
34/// Abstract implementation of an async timer for runtime independence
35pub trait AsyncTimer: Send + Debug + 'static {
36 /// Update the timer to expire at `i`
37 fn reset(self: Pin<&mut Self>, i: Instant);
38 /// Check whether the timer has expired, and register to be woken if not
39 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>;
40}
41
42/// Abstract implementation of a UDP socket for runtime independence
43pub trait AsyncUdpSocket: Send + Sync + Debug + 'static {
44 /// Create a [`UdpSender`] that can register a single task for write-readiness notifications
45 /// and send a transmit, if ready.
46 ///
47 /// A `poll_send` method on a single object can usually store only one [`Waker`] at a time,
48 /// i.e. allow at most one caller to wait for an event. This method allows any number of
49 /// interested tasks to construct their own [`UdpSender`] object. They can all then wait for the
50 /// same event and be notified concurrently, because each [`UdpSender`] can store a separate
51 /// [`Waker`].
52 ///
53 /// [`Waker`]: std::task::Waker
54 fn create_sender(&self) -> Pin<Box<dyn UdpSender>>;
55
56 /// Receive UDP datagrams, or register to be woken if receiving may succeed in the future
57 fn poll_recv(
58 &mut self,
59 cx: &mut Context,
60 bufs: &mut [IoSliceMut<'_>],
61 meta: &mut [RecvMeta],
62 ) -> Poll<io::Result<usize>>;
63
64 /// Look up the local IP address and port used by this socket
65 fn local_addr(&self) -> io::Result<SocketAddr>;
66
67 /// Maximum number of datagrams that might be described by a single [`RecvMeta`]
68 fn max_receive_segments(&self) -> usize {
69 1
70 }
71
72 /// Whether datagrams might get fragmented into multiple parts
73 ///
74 /// Sockets should prevent this for best performance. See e.g. the `IPV6_DONTFRAG` socket
75 /// option.
76 fn may_fragment(&self) -> bool {
77 true
78 }
79}
80
81/// An object for asynchronously writing to an associated [`AsyncUdpSocket`].
82///
83/// Any number of [`UdpSender`]s may exist for a single [`AsyncUdpSocket`]. Each [`UdpSender`] is
84/// responsible for notifying at most one task for send readiness.
85pub trait UdpSender: Send + Sync + Debug + 'static {
86 /// Send a UDP datagram, or register to be woken if sending may succeed in the future.
87 ///
88 /// Usually implementations of this will poll the socket for writability before trying to
89 /// write to them, and retry both if writing fails.
90 ///
91 /// Quinn will create multiple [`UdpSender`]s, one for each task it's using it from. Thus it's
92 /// important to poll the underlying socket in a way that doesn't overwrite wakers.
93 ///
94 /// A single [`UdpSender`] will be reused, even if `poll_send` returns `Poll::Ready` once,
95 /// unlike [`Future::poll`], so calling it again after readiness should not panic.
96 fn poll_send(
97 self: Pin<&mut Self>,
98 transmit: &Transmit,
99 cx: &mut Context,
100 ) -> Poll<io::Result<()>>;
101
102 /// Maximum number of datagrams that a [`Transmit`] may encode.
103 fn max_transmit_segments(&self) -> usize {
104 1
105 }
106}
107
108pin_project_lite::pin_project! {
109 /// A helper for constructing [`UdpSender`]s from an underlying `Socket` type.
110 ///
111 /// This struct implements [`UdpSender`] if `MakeWritableFn` produces a `WritableFut`.
112 ///
113 /// Also serves as a trick, since `WritableFut` doesn't need to be a named future,
114 /// it can be an anonymous async block, as long as `MakeWritableFn` produces that
115 /// anonymous async block type.
116 ///
117 /// The `UdpSenderHelper` generic type parameters don't need to named, as it will be
118 /// used in its dyn-compatible form as a `Pin<Box<dyn UdpSender>>`.
119 struct UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut> {
120 socket: Socket,
121 make_writable_fut_fn: MakeWritableFutFn,
122 #[pin]
123 writable_fut: Option<WritableFut>,
124 }
125}
126
127impl<Socket, MakeWritableFutFn, WritableFut> Debug
128 for UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut>
129{
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 f.write_str("UdpSender")
132 }
133}
134
135impl<Socket, MakeWritableFutFn, WriteableFut>
136 UdpSenderHelper<Socket, MakeWritableFutFn, WriteableFut>
137{
138 /// Create helper that implements [`UdpSender`] from a socket.
139 ///
140 /// Additionally you need to provide what is essentially an async function
141 /// that resolves once the socket is write-ready.
142 ///
143 /// See also the bounds on this struct's [`UdpSender`] implementation.
144 #[cfg(any(feature = "runtime-smol", feature = "runtime-tokio",))]
145 fn new(inner: Socket, make_fut: MakeWritableFutFn) -> Self {
146 Self {
147 socket: inner,
148 make_writable_fut_fn: make_fut,
149 writable_fut: None,
150 }
151 }
152}
153
154impl<Socket, MakeWritableFutFn, WritableFut> super::UdpSender
155 for UdpSenderHelper<Socket, MakeWritableFutFn, WritableFut>
156where
157 Socket: UdpSenderHelperSocket,
158 MakeWritableFutFn: Fn(&Socket) -> WritableFut + Send + Sync + 'static,
159 WritableFut: Future<Output = io::Result<()>> + Send + Sync + 'static,
160{
161 fn poll_send(
162 self: Pin<&mut Self>,
163 transmit: &udp::Transmit,
164 cx: &mut Context,
165 ) -> Poll<io::Result<()>> {
166 let mut this = self.project();
167 loop {
168 if this.writable_fut.is_none() {
169 this.writable_fut
170 .set(Some((this.make_writable_fut_fn)(this.socket)));
171 }
172 // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely
173 // obtain an `&mut WritableFut` after storing it in `self.writable_fut` when `self` is already behind `Pin`,
174 // and if we didn't store it then we wouldn't be able to keep it alive between
175 // `poll_send` calls.
176 let result =
177 std::task::ready!(this.writable_fut.as_mut().as_pin_mut().unwrap().poll(cx));
178
179 // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for
180 // a new `Future` to be created on the next call.
181 this.writable_fut.set(None);
182
183 // If .writable() fails, propagate the error
184 result?;
185
186 match this.socket.try_send(transmit) {
187 // We thought the socket was writable, but it wasn't, then retry so that either another
188 // `writable().await` call determines that the socket is indeed not writable and
189 // registers us for a wakeup, or the send succeeds if this really was just a
190 // transient failure.
191 Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
192 // In all other cases, either propagate the error or we're Ok
193 result => return Poll::Ready(result),
194 }
195 }
196 }
197
198 fn max_transmit_segments(&self) -> usize {
199 self.socket.max_transmit_segments()
200 }
201}
202
203/// Parts of the [`UdpSender`] trait that aren't asynchronous or require storing wakers.
204///
205/// This trait is used by [`UdpSenderHelper`] to help construct [`UdpSender`]s.
206trait UdpSenderHelperSocket: Send + Sync + 'static {
207 /// Try to send a transmit, if the socket happens to be write-ready.
208 ///
209 /// If not write-ready, this is allowed to return [`std::io::ErrorKind::WouldBlock`].
210 ///
211 /// The [`UdpSenderHelper`] will use this to implement [`UdpSender::poll_send`].
212 fn try_send(&self, transmit: &udp::Transmit) -> io::Result<()>;
213
214 /// See [`UdpSender::max_transmit_segments`].
215 fn max_transmit_segments(&self) -> usize;
216}
217
218/// Automatically select an appropriate runtime from those enabled at compile time
219///
220/// If `runtime-tokio` is enabled and this function is called from within a Tokio runtime context,
221/// then `TokioRuntime` is returned. Otherwise, if `runtime-smol` is enabled, `SmolRuntime` is
222/// returned. Otherwise, `None` is returned.
223#[cfg(any(feature = "runtime-tokio", feature = "runtime-smol"))]
224#[allow(clippy::needless_return)] // Be sure we return the right thing
225pub fn default_runtime() -> Option<Arc<dyn Runtime>> {
226 #[cfg(feature = "runtime-tokio")]
227 {
228 if ::tokio::runtime::Handle::try_current().is_ok() {
229 return Some(Arc::new(TokioRuntime));
230 }
231 }
232
233 #[cfg(feature = "runtime-smol")]
234 {
235 return Some(Arc::new(SmolRuntime));
236 }
237
238 #[cfg(not(feature = "runtime-smol"))]
239 None
240}
241
242#[cfg(feature = "runtime-tokio")]
243mod tokio;
244#[cfg(feature = "runtime-tokio")]
245pub use tokio::TokioRuntime;
246
247#[cfg(feature = "runtime-smol")]
248mod smol;
249#[cfg(feature = "runtime-smol")]
250pub use smol::*;