iroh_quinn/
connection.rs

1use std::{
2    any::Any,
3    fmt,
4    future::Future,
5    io,
6    net::{IpAddr, SocketAddr},
7    pin::Pin,
8    sync::{Arc, Weak},
9    task::{Context, Poll, Waker, ready},
10};
11
12use bytes::Bytes;
13use pin_project_lite::pin_project;
14use rustc_hash::FxHashMap;
15use thiserror::Error;
16use tokio::sync::{Notify, futures::Notified, mpsc, oneshot, watch};
17use tracing::{Instrument, Span, debug_span};
18
19use crate::{
20    ConnectionEvent, Duration, Instant, Path, VarInt,
21    endpoint::ensure_ipv6,
22    mutex::Mutex,
23    path::OpenPath,
24    recv_stream::RecvStream,
25    runtime::{AsyncTimer, Runtime, UdpSender},
26    send_stream::SendStream,
27    udp_transmit,
28};
29use proto::{
30    ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent, PathError, PathEvent,
31    PathId, PathStats, PathStatus, Side, StreamEvent, StreamId, TransportError, TransportErrorCode,
32    congestion::Controller, iroh_hp,
33};
34
35/// In-progress connection attempt future
36#[derive(Debug)]
37pub struct Connecting {
38    conn: Option<ConnectionRef>,
39    connected: oneshot::Receiver<bool>,
40    handshake_data_ready: Option<oneshot::Receiver<()>>,
41}
42
43impl Connecting {
44    pub(crate) fn new(
45        handle: ConnectionHandle,
46        conn: proto::Connection,
47        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
48        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
49        sender: Pin<Box<dyn UdpSender>>,
50        runtime: Arc<dyn Runtime>,
51    ) -> Self {
52        let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
53        let (on_connected_send, on_connected_recv) = oneshot::channel();
54
55        let conn = ConnectionRef(Arc::new(ConnectionInner {
56            state: Mutex::new(State::new(
57                conn,
58                handle,
59                endpoint_events,
60                conn_events,
61                on_handshake_data_send,
62                on_connected_send,
63                sender,
64                runtime.clone(),
65            )),
66            shared: Shared::default(),
67        }));
68
69        let driver = ConnectionDriver(conn.clone());
70        runtime.spawn(Box::pin(
71            async {
72                if let Err(e) = driver.await {
73                    tracing::error!("I/O error: {e}");
74                }
75            }
76            .instrument(Span::current()),
77        ));
78
79        Self {
80            conn: Some(conn),
81            connected: on_connected_recv,
82            handshake_data_ready: Some(on_handshake_data_recv),
83        }
84    }
85
86    /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
87    ///
88    /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
89    /// If so, the returned [`Connection`] can be used to send application data without waiting for
90    /// the rest of the handshake to complete, at the cost of weakened cryptographic security
91    /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
92    /// complete, at which point subsequently opened streams and written data will have full
93    /// cryptographic protection.
94    ///
95    /// ## Outgoing
96    ///
97    /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
98    /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
99    /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
100    /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
101    /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
102    /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
103    /// If it was rejected, the existence of streams opened and other application data sent prior
104    /// to the handshake completing will not be conveyed to the remote application, and local
105    /// operations on them will return `ZeroRttRejected` errors.
106    ///
107    /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
108    /// relevant resumption state to be stored in the server, which servers may limit or lose for
109    /// various reasons including not persisting resumption state across server restarts.
110    ///
111    /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
112    /// implementation's docs for 0-RTT pitfalls.
113    ///
114    /// ## Incoming
115    ///
116    /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
117    /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
118    ///
119    /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
120    /// implementation's docs for 0-RTT pitfalls.
121    ///
122    /// ## Security
123    ///
124    /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
125    /// replay attacks, and should therefore never invoke non-idempotent operations.
126    ///
127    /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
128    /// before TLS client authentication has occurred, and should therefore not be used to send
129    /// data for which client authentication is being used.
130    pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
131        // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
132        // have to release it explicitly before returning `self` by value.
133        let conn = (self.conn.as_mut().unwrap()).state.lock("into_0rtt");
134
135        let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
136        drop(conn);
137
138        if is_ok {
139            let conn = self.conn.take().unwrap();
140            Ok((Connection(conn), ZeroRttAccepted(self.connected)))
141        } else {
142            Err(self)
143        }
144    }
145
146    /// Parameters negotiated during the handshake
147    ///
148    /// The dynamic type returned is determined by the configured
149    /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
150    /// be [`downcast`](Box::downcast) to a
151    /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
152    pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
153        // Taking &mut self allows us to use a single oneshot channel rather than dealing with
154        // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
155        // simple.
156        if let Some(x) = self.handshake_data_ready.take() {
157            let _ = x.await;
158        }
159        let conn = self.conn.as_ref().unwrap();
160        let inner = conn.state.lock("handshake");
161        inner
162            .inner
163            .crypto_session()
164            .handshake_data()
165            .ok_or_else(|| {
166                inner
167                    .error
168                    .clone()
169                    .expect("spurious handshake data ready notification")
170            })
171    }
172
173    /// The local IP address which was used when the peer established
174    /// the connection
175    ///
176    /// This can be different from the address the endpoint is bound to, in case
177    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
178    ///
179    /// This will return `None` for clients, or when the platform does not expose this
180    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
181    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
182    ///
183    /// Will panic if called after `poll` has returned `Ready`.
184    pub fn local_ip(&self) -> Option<IpAddr> {
185        let conn = self.conn.as_ref().unwrap();
186        let inner = conn.state.lock("local_ip");
187
188        inner.inner.local_ip()
189    }
190
191    /// The peer's UDP address
192    ///
193    /// Will panic if called after `poll` has returned `Ready`.
194    pub fn remote_address(&self) -> SocketAddr {
195        let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready");
196        // TODO: another unwrap
197        conn_ref
198            .state
199            .lock("remote_address")
200            .inner
201            .path_remote_address(PathId::ZERO)
202            .expect("path exists when connecting")
203    }
204}
205
206impl Future for Connecting {
207    type Output = Result<Connection, ConnectionError>;
208    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
209        Pin::new(&mut self.connected).poll(cx).map(|_| {
210            let conn = self.conn.take().unwrap();
211            let inner = conn.state.lock("connecting");
212            if inner.connected {
213                drop(inner);
214                Ok(Connection(conn))
215            } else {
216                Err(inner
217                    .error
218                    .clone()
219                    .expect("connected signaled without connection success or error"))
220            }
221        })
222    }
223}
224
225/// Future that completes when a connection is fully established
226///
227/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
228/// value is meaningless.
229pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
230
231impl Future for ZeroRttAccepted {
232    type Output = bool;
233    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
234        Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
235    }
236}
237
238/// A future that drives protocol logic for a connection
239///
240/// This future handles the protocol logic for a single connection, routing events from the
241/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
242/// It also keeps track of outstanding timeouts for the `Connection`.
243///
244/// If the connection encounters an error condition, this future will yield an error. It will
245/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
246/// connection-related futures, this waits for the draining period to complete to ensure that
247/// packets still in flight from the peer are handled gracefully.
248#[must_use = "connection drivers must be spawned for their connections to function"]
249#[derive(Debug)]
250struct ConnectionDriver(ConnectionRef);
251
252impl Future for ConnectionDriver {
253    type Output = Result<(), io::Error>;
254
255    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
256        let conn = &mut *self.0.state.lock("poll");
257
258        let span = debug_span!("drive", id = conn.handle.0);
259        let _guard = span.enter();
260
261        if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
262            conn.terminate(e, &self.0.shared);
263            return Poll::Ready(Ok(()));
264        }
265        let mut keep_going = conn.drive_transmit(cx)?;
266        // If a timer expires, there might be more to transmit. When we transmit something, we
267        // might need to reset a timer. Hence, we must loop until neither happens.
268        keep_going |= conn.drive_timer(cx);
269        conn.forward_endpoint_events();
270        conn.forward_app_events(&self.0.shared);
271
272        if !conn.inner.is_drained() {
273            if keep_going {
274                // If the connection hasn't processed all tasks, schedule it again
275                cx.waker().wake_by_ref();
276            } else {
277                conn.driver = Some(cx.waker().clone());
278            }
279            return Poll::Pending;
280        }
281        if conn.error.is_none() {
282            unreachable!("drained connections always have an error");
283        }
284        Poll::Ready(Ok(()))
285    }
286}
287
288/// A QUIC connection.
289///
290/// If all references to a connection (including every clone of the `Connection` handle, streams of
291/// incoming streams, and the various stream types) have been dropped, then the connection will be
292/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
293/// connection explicitly by calling [`Connection::close()`].
294///
295/// Closing the connection immediately abandons efforts to deliver data to the peer.  Upon
296/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
297/// application. [`Connection::close()`] describes in more detail how to gracefully close a
298/// connection without losing application data.
299///
300/// May be cloned to obtain another handle to the same connection.
301///
302/// [`Connection::close()`]: Connection::close
303#[derive(Debug, Clone)]
304pub struct Connection(ConnectionRef);
305
306impl Connection {
307    /// Returns a weak reference to the inner connection struct.
308    pub fn weak_handle(&self) -> WeakConnectionHandle {
309        WeakConnectionHandle(Arc::downgrade(&self.0.0))
310    }
311
312    /// Initiate a new outgoing unidirectional stream.
313    ///
314    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
315    /// consequence, the peer won't be notified that a stream has been opened until the stream is
316    /// actually used.
317    pub fn open_uni(&self) -> OpenUni<'_> {
318        OpenUni {
319            conn: &self.0,
320            notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
321        }
322    }
323
324    /// Initiate a new outgoing bidirectional stream.
325    ///
326    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
327    /// consequence, the peer won't be notified that a stream has been opened until the stream is
328    /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
329    /// anything to [`SendStream`] will never succeed.
330    ///
331    /// [`open_bi()`]: crate::Connection::open_bi
332    /// [`SendStream`]: crate::SendStream
333    /// [`RecvStream`]: crate::RecvStream
334    pub fn open_bi(&self) -> OpenBi<'_> {
335        OpenBi {
336            conn: &self.0,
337            notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
338        }
339    }
340
341    /// Accept the next incoming uni-directional stream
342    pub fn accept_uni(&self) -> AcceptUni<'_> {
343        AcceptUni {
344            conn: &self.0,
345            notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
346        }
347    }
348
349    /// Accept the next incoming bidirectional stream
350    ///
351    /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
352    /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
353    /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
354    ///
355    /// [`accept_bi()`]: crate::Connection::accept_bi
356    /// [`open_bi()`]: crate::Connection::open_bi
357    /// [`SendStream`]: crate::SendStream
358    /// [`RecvStream`]: crate::RecvStream
359    pub fn accept_bi(&self) -> AcceptBi<'_> {
360        AcceptBi {
361            conn: &self.0,
362            notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
363        }
364    }
365
366    /// Receive an application datagram
367    pub fn read_datagram(&self) -> ReadDatagram<'_> {
368        ReadDatagram {
369            conn: &self.0,
370            notify: self.0.shared.datagram_received.notified(),
371        }
372    }
373
374    /// Opens a new path if no path exists yet for the remote address.
375    ///
376    /// Otherwise behaves exactly as [`open_path`].
377    ///
378    /// [`open_path`]: Self::open_path
379    pub fn open_path_ensure(
380        &self,
381        addr: SocketAddr,
382        initial_status: PathStatus,
383        rtt_hint: Option<Duration>,
384    ) -> OpenPath {
385        let mut state = self.0.state.lock("open_path");
386
387        // If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6.
388        // If not, we do not support IPv6.  We can not access endpoint::State from here
389        // however, but either all our paths use an IPv6 address, or all our paths use an
390        // IPv4 address.  So we can use that information.
391        let ipv6 = state
392            .inner
393            .paths()
394            .iter()
395            .filter_map(|id| {
396                state
397                    .inner
398                    .path_remote_address(*id)
399                    .map(|ip| ip.is_ipv6())
400                    .ok()
401            })
402            .next()
403            .unwrap_or_default();
404        if addr.is_ipv6() && !ipv6 {
405            return OpenPath::rejected(PathError::InvalidRemoteAddress(addr));
406        }
407        let addr = if ipv6 {
408            SocketAddr::V6(ensure_ipv6(addr))
409        } else {
410            addr
411        };
412
413        let now = state.runtime.now();
414        let open_res = state
415            .inner
416            .open_path_ensure(addr, initial_status, now, rtt_hint);
417        state.wake();
418        match open_res {
419            Ok((path_id, existed)) if existed => {
420                match state.open_path.get(&path_id).map(|tx| tx.subscribe()) {
421                    Some(recv) => OpenPath::new(path_id, recv, self.0.clone()),
422                    None => OpenPath::ready(path_id, self.0.clone()),
423                }
424            }
425            Ok((path_id, _)) => {
426                let (tx, rx) = watch::channel(Ok(()));
427                state.open_path.insert(path_id, tx);
428                drop(state);
429                OpenPath::new(path_id, rx, self.0.clone())
430            }
431            Err(err) => OpenPath::rejected(err),
432        }
433    }
434
435    /// Opens an additional path if the multipath extension is negotiated.
436    ///
437    /// The returned future completes once the path is either fully opened and ready to
438    /// carry application data, or if there was an error.
439    ///
440    /// Dropping the returned future does not cancel the opening of the path, the
441    /// [`PathEvent::Opened`] event will still be emitted from [`Self::path_events`] if the
442    /// path opens.  The [`PathId`] for the events can be extracted from
443    /// [`OpenPath::path_id`].
444    ///
445    /// Failure to open a path can either occur immediately, before polling the returned
446    /// future, or at a later time.  If the failure is immediate [`OpenPath::path_id`] will
447    /// return `None` and the future will be ready immediately.  If the failure happens
448    /// later, a [`PathEvent`] will be emitted.
449    pub fn open_path(
450        &self,
451        addr: SocketAddr,
452        initial_status: PathStatus,
453        rtt_hint: Option<Duration>,
454    ) -> OpenPath {
455        let mut state = self.0.state.lock("open_path");
456
457        // If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6.
458        // If not, we do not support IPv6.  We can not access endpoint::State from here
459        // however, but either all our paths use an IPv6 address, or all our paths use an
460        // IPv4 address.  So we can use that information.
461        let ipv6 = state
462            .inner
463            .paths()
464            .iter()
465            .filter_map(|id| {
466                state
467                    .inner
468                    .path_remote_address(*id)
469                    .map(|ip| ip.is_ipv6())
470                    .ok()
471            })
472            .next()
473            .unwrap_or_default();
474        if addr.is_ipv6() && !ipv6 {
475            return OpenPath::rejected(PathError::InvalidRemoteAddress(addr));
476        }
477        let addr = if ipv6 {
478            SocketAddr::V6(ensure_ipv6(addr))
479        } else {
480            addr
481        };
482
483        let (on_open_path_send, on_open_path_recv) = watch::channel(Ok(()));
484        let now = state.runtime.now();
485        let open_res = state.inner.open_path(addr, initial_status, now, rtt_hint);
486        state.wake();
487        match open_res {
488            Ok(path_id) => {
489                state.open_path.insert(path_id, on_open_path_send);
490                drop(state);
491                OpenPath::new(path_id, on_open_path_recv, self.0.clone())
492            }
493            Err(err) => OpenPath::rejected(err),
494        }
495    }
496
497    /// Returns the [`Path`] structure of an open path
498    pub fn path(&self, id: PathId) -> Option<Path> {
499        // TODO(flub): Using this to know if the path still exists is... hacky.
500        self.0.state.lock("path").inner.path_status(id).ok()?;
501        Some(Path {
502            id,
503            conn: self.0.clone(),
504        })
505    }
506
507    /// A broadcast receiver of [`PathEvent`]s for all paths in this connection
508    pub fn path_events(&self) -> tokio::sync::broadcast::Receiver<PathEvent> {
509        self.0.state.lock("path_events").path_events.subscribe()
510    }
511
512    /// A broadcast receiver of [`iroh_hp::Event`]s for updates about server addresses
513    pub fn nat_traversal_updates(&self) -> tokio::sync::broadcast::Receiver<iroh_hp::Event> {
514        self.0
515            .state
516            .lock("nat_traversal_updates")
517            .nat_traversal_updates
518            .subscribe()
519    }
520
521    /// Wait for the connection to be closed for any reason
522    ///
523    /// Despite the return type's name, closed connections are often not an error condition at the
524    /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
525    /// and [`ConnectionError::ApplicationClosed`].
526    pub async fn closed(&self) -> ConnectionError {
527        {
528            let conn = self.0.state.lock("closed");
529            if let Some(error) = conn.error.as_ref() {
530                return error.clone();
531            }
532            // Construct the future while the lock is held to ensure we can't miss a wakeup if
533            // the `Notify` is signaled immediately after we release the lock. `await` it after
534            // the lock guard is out of scope.
535            self.0.shared.closed.notified()
536        }
537        .await;
538        self.0
539            .state
540            .lock("closed")
541            .error
542            .as_ref()
543            .expect("closed without an error")
544            .clone()
545    }
546
547    /// Wait for the connection to be closed without keeping a strong reference to the connection
548    ///
549    /// Returns a future that resolves, once the connection is closed, to a tuple of
550    /// ([`ConnectionError`], [`ConnectionStats`]).
551    ///
552    /// Calling [`Self::closed`] keeps the connection alive until it is either closed locally via [`Connection::close`]
553    /// or closed by the remote peer. This function instead does not keep the connection itself alive,
554    /// so if all *other* clones of the connection are dropped, the connection will be closed implicitly even
555    /// if there are futures returned from this function still being awaited.
556    pub fn on_closed(&self) -> OnClosed {
557        let (tx, rx) = oneshot::channel();
558        self.0.state.lock("on_closed").on_closed.push(tx);
559        OnClosed {
560            conn: self.weak_handle(),
561            rx,
562        }
563    }
564
565    /// If the connection is closed, the reason why.
566    ///
567    /// Returns `None` if the connection is still open.
568    pub fn close_reason(&self) -> Option<ConnectionError> {
569        self.0.state.lock("close_reason").error.clone()
570    }
571
572    /// Close the connection immediately.
573    ///
574    /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
575    /// more data is sent to the peer and the peer may drop buffered data upon receiving
576    /// the CONNECTION_CLOSE frame.
577    ///
578    /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
579    ///
580    /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
581    /// is preserved in full, it should be kept under 1KiB.
582    ///
583    /// # Gracefully closing a connection
584    ///
585    /// Only the peer last receiving application data can be certain that all data is
586    /// delivered. The only reliable action it can then take is to close the connection,
587    /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
588    /// frame is very likely if both endpoints stay online long enough, and
589    /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
590    /// remote peer will time out the connection, provided that the idle timeout is not
591    /// disabled.
592    ///
593    /// The sending side can not guarantee all stream data is delivered to the remote
594    /// application. It only knows the data is delivered to the QUIC stack of the remote
595    /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
596    /// [`close()`] the remote endpoint may drop any data it received but is as yet
597    /// undelivered to the application, including data that was acknowledged as received to
598    /// the local endpoint.
599    ///
600    /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
601    /// [`Endpoint::wait_idle()`]: crate::Endpoint::wait_idle
602    /// [`close()`]: Connection::close
603    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
604        let conn = &mut *self.0.state.lock("close");
605        conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
606    }
607
608    /// Wait for the handshake to be confirmed.
609    ///
610    /// As a server, who must be authenticated by clients,
611    /// this happens when the handshake completes
612    /// upon receiving a TLS Finished message from the client.
613    /// In return, the server send a HANDSHAKE_DONE frame.
614    ///
615    /// As a client, this happens when receiving a HANDSHAKE_DONE frame.
616    /// At this point, the server has either accepted our authentication,
617    /// or, if client authentication is not required, accepted our lack of authentication.
618    pub async fn handshake_confirmed(&self) -> Result<(), ConnectionError> {
619        {
620            let conn = self.0.state.lock("handshake_confirmed");
621            if let Some(error) = conn.error.as_ref() {
622                return Err(error.clone());
623            }
624            if conn.handshake_confirmed {
625                return Ok(());
626            }
627            // Construct the future while the lock is held to ensure we can't miss a wakeup if
628            // the `Notify` is signaled immediately after we release the lock. `await` it after
629            // the lock guard is out of scope.
630            self.0.shared.handshake_confirmed.notified()
631        }
632        .await;
633        if let Some(error) = self.0.state.lock("handshake_confirmed").error.as_ref() {
634            Err(error.clone())
635        } else {
636            Ok(())
637        }
638    }
639
640    /// Transmit `data` as an unreliable, unordered application datagram
641    ///
642    /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
643    /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
644    /// dictated by the peer.
645    ///
646    /// Previously queued datagrams which are still unsent may be discarded to make space for this
647    /// datagram, in order of oldest to newest.
648    pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
649        let conn = &mut *self.0.state.lock("send_datagram");
650        if let Some(ref x) = conn.error {
651            return Err(SendDatagramError::ConnectionLost(x.clone()));
652        }
653        use proto::SendDatagramError::*;
654        match conn.inner.datagrams().send(data, true) {
655            Ok(()) => {
656                conn.wake();
657                Ok(())
658            }
659            Err(e) => Err(match e {
660                Blocked(..) => unreachable!(),
661                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
662                Disabled => SendDatagramError::Disabled,
663                TooLarge => SendDatagramError::TooLarge,
664            }),
665        }
666    }
667
668    /// Transmit `data` as an unreliable, unordered application datagram
669    ///
670    /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
671    /// conditions, which effectively prioritizes old datagrams over new datagrams.
672    ///
673    /// See [`send_datagram()`] for details.
674    ///
675    /// [`send_datagram()`]: Connection::send_datagram
676    pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
677        SendDatagram {
678            conn: &self.0,
679            data: Some(data),
680            notify: self.0.shared.datagrams_unblocked.notified(),
681        }
682    }
683
684    /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
685    ///
686    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
687    ///
688    /// This may change over the lifetime of a connection according to variation in the path MTU
689    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
690    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
691    ///
692    /// Not necessarily the maximum size of received datagrams.
693    ///
694    /// [`send_datagram()`]: Connection::send_datagram
695    pub fn max_datagram_size(&self) -> Option<usize> {
696        self.0
697            .state
698            .lock("max_datagram_size")
699            .inner
700            .datagrams()
701            .max_size()
702    }
703
704    /// Bytes available in the outgoing datagram buffer
705    ///
706    /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
707    /// at most this size is guaranteed not to cause older datagrams to be dropped.
708    pub fn datagram_send_buffer_space(&self) -> usize {
709        self.0
710            .state
711            .lock("datagram_send_buffer_space")
712            .inner
713            .datagrams()
714            .send_buffer_space()
715    }
716
717    /// The side of the connection (client or server)
718    pub fn side(&self) -> Side {
719        self.0.state.lock("side").inner.side()
720    }
721
722    /// The peer's UDP address
723    ///
724    /// If [`ServerConfig::migration`] is `true`, clients may change addresses at will,
725    /// e.g. when switching to a cellular internet connection.
726    ///
727    /// If [`multipath`] is enabled this will return the address of *any*
728    /// path, and may not be consistent. Prefer [`Path::remote_address`] instead.
729    ///
730    /// [`ServerConfig::migration`]: crate::ServerConfig::migration
731    /// [`multipath`]: crate::TransportConfig::max_concurrent_multipath_paths
732    pub fn remote_address(&self) -> SocketAddr {
733        // TODO: an unwrap again
734        let state = self.0.state.lock("remote_address");
735        state
736            .inner
737            .paths()
738            .iter()
739            .filter_map(|id| state.inner.path_remote_address(*id).ok())
740            .next()
741            .unwrap()
742    }
743
744    /// The local IP address which was used when the peer established
745    /// the connection
746    ///
747    /// This can be different from the address the endpoint is bound to, in case
748    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
749    ///
750    /// This will return `None` for clients, or when the platform does not expose this
751    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
752    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
753    pub fn local_ip(&self) -> Option<IpAddr> {
754        self.0.state.lock("local_ip").inner.local_ip()
755    }
756
757    /// Current best estimate of this connection's latency (round-trip-time)
758    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
759        self.0.state.lock("rtt").inner.rtt(path_id)
760    }
761
762    /// Returns connection statistics
763    pub fn stats(&self) -> ConnectionStats {
764        self.0.state.lock("stats").inner.stats()
765    }
766
767    /// Returns path statistics
768    pub fn path_stats(&self, path_id: PathId) -> Option<PathStats> {
769        self.0.state.lock("path_stats").inner.path_stats(path_id)
770    }
771
772    /// Current state of the congestion control algorithm, for debugging purposes
773    pub fn congestion_state(&self, path_id: PathId) -> Option<Box<dyn Controller>> {
774        self.0
775            .state
776            .lock("congestion_state")
777            .inner
778            .congestion_state(path_id)
779            .map(|c| c.clone_box())
780    }
781
782    /// Parameters negotiated during the handshake
783    ///
784    /// Guaranteed to return `Some` on fully established connections or after
785    /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
786    /// the returned value.
787    ///
788    /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
789    pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
790        self.0
791            .state
792            .lock("handshake_data")
793            .inner
794            .crypto_session()
795            .handshake_data()
796    }
797
798    /// Cryptographic identity of the peer
799    ///
800    /// The dynamic type returned is determined by the configured
801    /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
802    /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
803    pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
804        self.0
805            .state
806            .lock("peer_identity")
807            .inner
808            .crypto_session()
809            .peer_identity()
810    }
811
812    /// A stable identifier for this connection
813    ///
814    /// Peer addresses and connection IDs can change, but this value will remain
815    /// fixed for the lifetime of the connection.
816    pub fn stable_id(&self) -> usize {
817        self.0.stable_id()
818    }
819
820    /// Update traffic keys spontaneously
821    ///
822    /// This primarily exists for testing purposes.
823    pub fn force_key_update(&self) {
824        self.0
825            .state
826            .lock("force_key_update")
827            .inner
828            .force_key_update()
829    }
830
831    /// Derive keying material from this connection's TLS session secrets.
832    ///
833    /// When both peers call this method with the same `label` and `context`
834    /// arguments and `output` buffers of equal length, they will get the
835    /// same sequence of bytes in `output`. These bytes are cryptographically
836    /// strong and pseudorandom, and are suitable for use as keying material.
837    ///
838    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
839    pub fn export_keying_material(
840        &self,
841        output: &mut [u8],
842        label: &[u8],
843        context: &[u8],
844    ) -> Result<(), proto::crypto::ExportKeyingMaterialError> {
845        self.0
846            .state
847            .lock("export_keying_material")
848            .inner
849            .crypto_session()
850            .export_keying_material(output, label, context)
851    }
852
853    /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
854    ///
855    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
856    /// `count`s increase both minimum and worst-case memory consumption.
857    pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
858        let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
859        conn.inner.set_max_concurrent_streams(Dir::Uni, count);
860        // May need to send MAX_STREAMS to make progress
861        conn.wake();
862    }
863
864    /// See [`proto::TransportConfig::send_window()`]
865    pub fn set_send_window(&self, send_window: u64) {
866        let mut conn = self.0.state.lock("set_send_window");
867        conn.inner.set_send_window(send_window);
868        conn.wake();
869    }
870
871    /// See [`proto::TransportConfig::receive_window()`]
872    pub fn set_receive_window(&self, receive_window: VarInt) {
873        let mut conn = self.0.state.lock("set_receive_window");
874        conn.inner.set_receive_window(receive_window);
875        conn.wake();
876    }
877
878    /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
879    ///
880    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
881    /// `count`s increase both minimum and worst-case memory consumption.
882    pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
883        let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
884        conn.inner.set_max_concurrent_streams(Dir::Bi, count);
885        // May need to send MAX_STREAMS to make progress
886        conn.wake();
887    }
888
889    /// Track changed on our external address as reported by the peer.
890    pub fn observed_external_addr(&self) -> watch::Receiver<Option<SocketAddr>> {
891        let conn = self.0.state.lock("external_addr");
892        conn.observed_external_addr.subscribe()
893    }
894
895    /// Is multipath enabled?
896    // TODO(flub): not a useful API, once we do real things with multipath we can remove
897    // this again.
898    pub fn is_multipath_enabled(&self) -> bool {
899        let conn = self.0.state.lock("is_multipath_enabled");
900        conn.inner.is_multipath_negotiated()
901    }
902
903    /// Registers one address at which this endpoint might be reachable
904    ///
905    /// When the NAT traversal extension is negotiated, servers send these addresses to clients in
906    /// `ADD_ADDRESS` frames. This allows clients to obtain server address candidates to initiate
907    /// NAT traversal attempts. Clients provide their own reachable addresses in `REACH_OUT` frames
908    /// when [`Self::initiate_nat_traversal_round`] is called.
909    pub fn add_nat_traversal_address(&self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
910        let mut conn = self.0.state.lock("add_nat_traversal_addresses");
911        conn.inner.add_nat_traversal_address(address)
912    }
913
914    /// Removes one or more addresses from the set of addresses at which this endpoint is reachable
915    ///
916    /// When the NAT traversal extension is negotiated, servers send address removals to
917    /// clients in `REMOVE_ADDRESS` frames. This allows clients to stop using outdated
918    /// server address candidates that are no longer valid for NAT traversal.
919    ///
920    /// For clients, removed addresses will no longer be advertised in `REACH_OUT` frames.
921    ///
922    /// Addresses not present in the set will be silently ignored.
923    pub fn remove_nat_traversal_address(&self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
924        let mut conn = self.0.state.lock("remove_nat_traversal_addresses");
925        conn.inner.remove_nat_traversal_address(address)
926    }
927
928    /// Get the current local nat traversal addresses
929    pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
930        let conn = self.0.state.lock("get_local_nat_traversal_addresses");
931        conn.inner.get_local_nat_traversal_addresses()
932    }
933
934    /// Get the currently advertised nat traversal addresses by the server
935    pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
936        let conn = self.0.state.lock("get_remote_nat_traversal_addresses");
937        conn.inner.get_remote_nat_traversal_addresses()
938    }
939
940    /// Initiates a new nat traversal round
941    ///
942    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
943    /// frames, and initiating probing of the known remote addresses. When a new round is
944    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
945    ///
946    /// Returns the server addresses that are now being probed.
947    pub fn initiate_nat_traversal_round<F>(
948        &self,
949        rtt_hints: F,
950    ) -> Result<Vec<SocketAddr>, iroh_hp::Error>
951    where
952        F: Fn(&SocketAddr) -> Option<Duration>,
953    {
954        let mut conn = self.0.state.lock("initiate_nat_traversal_round");
955        let now = conn.runtime.now();
956        conn.inner.initiate_nat_traversal_round(now, rtt_hints)
957    }
958}
959
960pin_project! {
961    /// Future produced by [`Connection::open_uni`]
962    pub struct OpenUni<'a> {
963        conn: &'a ConnectionRef,
964        #[pin]
965        notify: Notified<'a>,
966    }
967}
968
969impl Future for OpenUni<'_> {
970    type Output = Result<SendStream, ConnectionError>;
971    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
972        let this = self.project();
973        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
974        Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
975    }
976}
977
978pin_project! {
979    /// Future produced by [`Connection::open_bi`]
980    pub struct OpenBi<'a> {
981        conn: &'a ConnectionRef,
982        #[pin]
983        notify: Notified<'a>,
984    }
985}
986
987impl Future for OpenBi<'_> {
988    type Output = Result<(SendStream, RecvStream), ConnectionError>;
989    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
990        let this = self.project();
991        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
992
993        Poll::Ready(Ok((
994            SendStream::new(conn.clone(), id, is_0rtt),
995            RecvStream::new(conn, id, is_0rtt),
996        )))
997    }
998}
999
1000fn poll_open<'a>(
1001    ctx: &mut Context<'_>,
1002    conn: &'a ConnectionRef,
1003    mut notify: Pin<&mut Notified<'a>>,
1004    dir: Dir,
1005) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1006    let mut state = conn.state.lock("poll_open");
1007    if let Some(ref e) = state.error {
1008        return Poll::Ready(Err(e.clone()));
1009    } else if let Some(id) = state.inner.streams().open(dir) {
1010        let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
1011        drop(state); // Release the lock so clone can take it
1012        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1013    }
1014    loop {
1015        match notify.as_mut().poll(ctx) {
1016            // `state` lock ensures we didn't race with readiness
1017            Poll::Pending => return Poll::Pending,
1018            // Spurious wakeup, get a new future
1019            Poll::Ready(()) => {
1020                notify.set(conn.shared.stream_budget_available[dir as usize].notified())
1021            }
1022        }
1023    }
1024}
1025
1026pin_project! {
1027    /// Future produced by [`Connection::accept_uni`]
1028    pub struct AcceptUni<'a> {
1029        conn: &'a ConnectionRef,
1030        #[pin]
1031        notify: Notified<'a>,
1032    }
1033}
1034
1035impl Future for AcceptUni<'_> {
1036    type Output = Result<RecvStream, ConnectionError>;
1037
1038    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1039        let this = self.project();
1040        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
1041        Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
1042    }
1043}
1044
1045pin_project! {
1046    /// Future produced by [`Connection::accept_bi`]
1047    pub struct AcceptBi<'a> {
1048        conn: &'a ConnectionRef,
1049        #[pin]
1050        notify: Notified<'a>,
1051    }
1052}
1053
1054impl Future for AcceptBi<'_> {
1055    type Output = Result<(SendStream, RecvStream), ConnectionError>;
1056
1057    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1058        let this = self.project();
1059        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
1060        Poll::Ready(Ok((
1061            SendStream::new(conn.clone(), id, is_0rtt),
1062            RecvStream::new(conn, id, is_0rtt),
1063        )))
1064    }
1065}
1066
1067fn poll_accept<'a>(
1068    ctx: &mut Context<'_>,
1069    conn: &'a ConnectionRef,
1070    mut notify: Pin<&mut Notified<'a>>,
1071    dir: Dir,
1072) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1073    let mut state = conn.state.lock("poll_accept");
1074    // Check for incoming streams before checking `state.error` so that already-received streams,
1075    // which are necessarily finite, can be drained from a closed connection.
1076    if let Some(id) = state.inner.streams().accept(dir) {
1077        let is_0rtt = state.inner.is_handshaking();
1078        state.wake(); // To send additional stream ID credit
1079        drop(state); // Release the lock so clone can take it
1080        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1081    } else if let Some(ref e) = state.error {
1082        return Poll::Ready(Err(e.clone()));
1083    }
1084    loop {
1085        match notify.as_mut().poll(ctx) {
1086            // `state` lock ensures we didn't race with readiness
1087            Poll::Pending => return Poll::Pending,
1088            // Spurious wakeup, get a new future
1089            Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
1090        }
1091    }
1092}
1093
1094pin_project! {
1095    /// Future produced by [`Connection::read_datagram`]
1096    pub struct ReadDatagram<'a> {
1097        conn: &'a ConnectionRef,
1098        #[pin]
1099        notify: Notified<'a>,
1100    }
1101}
1102
1103impl Future for ReadDatagram<'_> {
1104    type Output = Result<Bytes, ConnectionError>;
1105    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1106        let mut this = self.project();
1107        let mut state = this.conn.state.lock("ReadDatagram::poll");
1108        // Check for buffered datagrams before checking `state.error` so that already-received
1109        // datagrams, which are necessarily finite, can be drained from a closed connection.
1110        if let Some(x) = state.inner.datagrams().recv() {
1111            return Poll::Ready(Ok(x));
1112        } else if let Some(ref e) = state.error {
1113            return Poll::Ready(Err(e.clone()));
1114        }
1115        loop {
1116            match this.notify.as_mut().poll(ctx) {
1117                // `state` lock ensures we didn't race with readiness
1118                Poll::Pending => return Poll::Pending,
1119                // Spurious wakeup, get a new future
1120                Poll::Ready(()) => this
1121                    .notify
1122                    .set(this.conn.shared.datagram_received.notified()),
1123            }
1124        }
1125    }
1126}
1127
1128pin_project! {
1129    /// Future produced by [`Connection::send_datagram_wait`]
1130    pub struct SendDatagram<'a> {
1131        conn: &'a ConnectionRef,
1132        data: Option<Bytes>,
1133        #[pin]
1134        notify: Notified<'a>,
1135    }
1136}
1137
1138impl Future for SendDatagram<'_> {
1139    type Output = Result<(), SendDatagramError>;
1140    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1141        let mut this = self.project();
1142        let mut state = this.conn.state.lock("SendDatagram::poll");
1143        if let Some(ref e) = state.error {
1144            return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
1145        }
1146        use proto::SendDatagramError::*;
1147        match state
1148            .inner
1149            .datagrams()
1150            .send(this.data.take().unwrap(), false)
1151        {
1152            Ok(()) => {
1153                state.wake();
1154                Poll::Ready(Ok(()))
1155            }
1156            Err(e) => Poll::Ready(Err(match e {
1157                Blocked(data) => {
1158                    this.data.replace(data);
1159                    loop {
1160                        match this.notify.as_mut().poll(ctx) {
1161                            Poll::Pending => return Poll::Pending,
1162                            // Spurious wakeup, get a new future
1163                            Poll::Ready(()) => this
1164                                .notify
1165                                .set(this.conn.shared.datagrams_unblocked.notified()),
1166                        }
1167                    }
1168                }
1169                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1170                Disabled => SendDatagramError::Disabled,
1171                TooLarge => SendDatagramError::TooLarge,
1172            })),
1173        }
1174    }
1175}
1176
1177/// Future returned by [`Connection::on_closed`]
1178///
1179/// Resolves to a tuple of ([`ConnectionError`], [`ConnectionStats`]).
1180pub struct OnClosed {
1181    rx: oneshot::Receiver<(ConnectionError, ConnectionStats)>,
1182    conn: WeakConnectionHandle,
1183}
1184
1185impl Drop for OnClosed {
1186    fn drop(&mut self) {
1187        if self.rx.is_terminated() {
1188            return;
1189        };
1190        if let Some(conn) = self.conn.upgrade() {
1191            self.rx.close();
1192            conn.0
1193                .state
1194                .lock("OnClosed::drop")
1195                .on_closed
1196                .retain(|tx| !tx.is_closed());
1197        }
1198    }
1199}
1200
1201impl Future for OnClosed {
1202    type Output = (ConnectionError, ConnectionStats);
1203
1204    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1205        let this = self.get_mut();
1206        // The `expect` is safe because `State::drop` ensures that all senders are triggered
1207        // before being dropped.
1208        Pin::new(&mut this.rx)
1209            .poll(cx)
1210            .map(|x| x.expect("on_close sender is never dropped before sending"))
1211    }
1212}
1213
1214#[derive(Debug)]
1215pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
1216
1217impl ConnectionRef {
1218    fn from_arc(inner: Arc<ConnectionInner>) -> Self {
1219        inner.state.lock("from_arc").ref_count += 1;
1220        Self(inner)
1221    }
1222
1223    fn stable_id(&self) -> usize {
1224        &*self.0 as *const _ as usize
1225    }
1226}
1227
1228impl Clone for ConnectionRef {
1229    fn clone(&self) -> Self {
1230        Self::from_arc(Arc::clone(&self.0))
1231    }
1232}
1233
1234impl Drop for ConnectionRef {
1235    fn drop(&mut self) {
1236        let conn = &mut *self.state.lock("drop");
1237        if let Some(x) = conn.ref_count.checked_sub(1) {
1238            conn.ref_count = x;
1239            if x == 0 && !conn.inner.is_closed() {
1240                // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
1241                // not, we can't do any harm. If there were any streams being opened, then either
1242                // the connection will be closed for an unrelated reason or a fresh reference will
1243                // be constructed for the newly opened stream.
1244                conn.implicit_close(&self.shared);
1245            }
1246        }
1247    }
1248}
1249
1250impl std::ops::Deref for ConnectionRef {
1251    type Target = ConnectionInner;
1252    fn deref(&self) -> &Self::Target {
1253        &self.0
1254    }
1255}
1256
1257#[derive(Debug)]
1258pub(crate) struct ConnectionInner {
1259    pub(crate) state: Mutex<State>,
1260    pub(crate) shared: Shared,
1261}
1262
1263/// A handle to some connection internals, use with care.
1264///
1265/// This contains a weak reference to the connection so will not itself keep the connection
1266/// alive.
1267#[derive(Debug, Clone)]
1268pub struct WeakConnectionHandle(Weak<ConnectionInner>);
1269
1270impl WeakConnectionHandle {
1271    /// Returns `true` if the [`Connection`] associated with this handle is still alive.
1272    pub fn is_alive(&self) -> bool {
1273        self.0.upgrade().is_some()
1274    }
1275
1276    /// Upgrade the handle to a full `Connection`
1277    pub fn upgrade(&self) -> Option<Connection> {
1278        self.0
1279            .upgrade()
1280            .map(|inner| Connection(ConnectionRef::from_arc(inner)))
1281    }
1282}
1283
1284#[derive(Debug, Default)]
1285pub(crate) struct Shared {
1286    handshake_confirmed: Notify,
1287    /// Notified when new streams may be locally initiated due to an increase in stream ID flow
1288    /// control budget
1289    stream_budget_available: [Notify; 2],
1290    /// Notified when the peer has initiated a new stream
1291    stream_incoming: [Notify; 2],
1292    datagram_received: Notify,
1293    datagrams_unblocked: Notify,
1294    closed: Notify,
1295}
1296
1297pub(crate) struct State {
1298    pub(crate) inner: proto::Connection,
1299    driver: Option<Waker>,
1300    handle: ConnectionHandle,
1301    on_handshake_data: Option<oneshot::Sender<()>>,
1302    on_connected: Option<oneshot::Sender<bool>>,
1303    connected: bool,
1304    handshake_confirmed: bool,
1305    timer: Option<Pin<Box<dyn AsyncTimer>>>,
1306    timer_deadline: Option<Instant>,
1307    conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1308    endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1309    pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1310    pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1311    pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1312    /// Always set to Some before the connection becomes drained
1313    pub(crate) error: Option<ConnectionError>,
1314    /// Tracks paths being opened
1315    open_path: FxHashMap<PathId, watch::Sender<Result<(), PathError>>>,
1316    /// Tracks paths being closed
1317    pub(crate) close_path: FxHashMap<PathId, oneshot::Sender<VarInt>>,
1318    pub(crate) path_events: tokio::sync::broadcast::Sender<PathEvent>,
1319    /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1320    ref_count: usize,
1321    sender: Pin<Box<dyn UdpSender>>,
1322    pub(crate) runtime: Arc<dyn Runtime>,
1323    send_buffer: Vec<u8>,
1324    /// We buffer a transmit when the underlying I/O would block
1325    buffered_transmit: Option<proto::Transmit>,
1326    /// Our last external address reported by the peer. When multipath is enabled, this will be the
1327    /// last report across all paths.
1328    pub(crate) observed_external_addr: watch::Sender<Option<SocketAddr>>,
1329    pub(crate) nat_traversal_updates: tokio::sync::broadcast::Sender<iroh_hp::Event>,
1330    on_closed: Vec<oneshot::Sender<(ConnectionError, ConnectionStats)>>,
1331}
1332
1333impl State {
1334    #[allow(clippy::too_many_arguments)]
1335    fn new(
1336        inner: proto::Connection,
1337        handle: ConnectionHandle,
1338        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1339        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1340        on_handshake_data: oneshot::Sender<()>,
1341        on_connected: oneshot::Sender<bool>,
1342        sender: Pin<Box<dyn UdpSender>>,
1343        runtime: Arc<dyn Runtime>,
1344    ) -> Self {
1345        Self {
1346            inner,
1347            driver: None,
1348            handle,
1349            on_handshake_data: Some(on_handshake_data),
1350            on_connected: Some(on_connected),
1351            connected: false,
1352            handshake_confirmed: false,
1353            timer: None,
1354            timer_deadline: None,
1355            conn_events,
1356            endpoint_events,
1357            blocked_writers: FxHashMap::default(),
1358            blocked_readers: FxHashMap::default(),
1359            stopped: FxHashMap::default(),
1360            open_path: FxHashMap::default(),
1361            close_path: FxHashMap::default(),
1362            error: None,
1363            ref_count: 0,
1364            sender,
1365            runtime,
1366            send_buffer: Vec::new(),
1367            buffered_transmit: None,
1368            path_events: tokio::sync::broadcast::channel(32).0,
1369            observed_external_addr: watch::Sender::new(None),
1370            nat_traversal_updates: tokio::sync::broadcast::channel(32).0,
1371            on_closed: Vec::new(),
1372        }
1373    }
1374
1375    fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1376        let now = self.runtime.now();
1377        let mut transmits = 0;
1378
1379        let max_datagrams = self
1380            .sender
1381            .max_transmit_segments()
1382            .min(MAX_TRANSMIT_SEGMENTS);
1383
1384        loop {
1385            // Retry the last transmit, or get a new one.
1386            let t = match self.buffered_transmit.take() {
1387                Some(t) => t,
1388                None => {
1389                    self.send_buffer.clear();
1390                    match self
1391                        .inner
1392                        .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1393                    {
1394                        Some(t) => {
1395                            transmits += match t.segment_size {
1396                                None => 1,
1397                                Some(s) => t.size.div_ceil(s), // round up
1398                            };
1399                            t
1400                        }
1401                        None => break,
1402                    }
1403                }
1404            };
1405
1406            let len = t.size;
1407            match self
1408                .sender
1409                .as_mut()
1410                .poll_send(&udp_transmit(&t, &self.send_buffer[..len]), cx)
1411            {
1412                Poll::Pending => {
1413                    self.buffered_transmit = Some(t);
1414                    return Ok(false);
1415                }
1416                Poll::Ready(Err(e)) => return Err(e),
1417                Poll::Ready(Ok(())) => {}
1418            }
1419
1420            if transmits >= MAX_TRANSMIT_DATAGRAMS {
1421                // TODO: What isn't ideal here yet is that if we don't poll all
1422                // datagrams that could be sent we don't go into the `app_limited`
1423                // state and CWND continues to grow until we get here the next time.
1424                // See https://github.com/quinn-rs/quinn/issues/1126
1425                return Ok(true);
1426            }
1427        }
1428
1429        Ok(false)
1430    }
1431
1432    fn forward_endpoint_events(&mut self) {
1433        while let Some(event) = self.inner.poll_endpoint_events() {
1434            // If the endpoint driver is gone, noop.
1435            let _ = self.endpoint_events.send((self.handle, event));
1436        }
1437    }
1438
1439    /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1440    fn process_conn_events(
1441        &mut self,
1442        shared: &Shared,
1443        cx: &mut Context,
1444    ) -> Result<(), ConnectionError> {
1445        loop {
1446            match self.conn_events.poll_recv(cx) {
1447                Poll::Ready(Some(ConnectionEvent::Rebind(sender))) => {
1448                    self.sender = sender;
1449                    self.inner.local_address_changed();
1450                }
1451                Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1452                    self.inner.handle_event(event);
1453                }
1454                Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1455                    self.close(error_code, reason, shared);
1456                }
1457                Poll::Ready(None) => {
1458                    return Err(ConnectionError::TransportError(TransportError::new(
1459                        TransportErrorCode::INTERNAL_ERROR,
1460                        "endpoint driver future was dropped".to_string(),
1461                    )));
1462                }
1463                Poll::Pending => {
1464                    return Ok(());
1465                }
1466            }
1467        }
1468    }
1469
1470    fn forward_app_events(&mut self, shared: &Shared) {
1471        while let Some(event) = self.inner.poll() {
1472            use proto::Event::*;
1473            match event {
1474                HandshakeDataReady => {
1475                    if let Some(x) = self.on_handshake_data.take() {
1476                        let _ = x.send(());
1477                    }
1478                }
1479                Connected => {
1480                    self.connected = true;
1481                    if let Some(x) = self.on_connected.take() {
1482                        // We don't care if the on-connected future was dropped
1483                        let _ = x.send(self.inner.accepted_0rtt());
1484                    }
1485                    if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1486                        // Wake up rejected 0-RTT streams so they can fail immediately with
1487                        // `ZeroRttRejected` errors.
1488                        wake_all(&mut self.blocked_writers);
1489                        wake_all(&mut self.blocked_readers);
1490                        wake_all_notify(&mut self.stopped);
1491                    }
1492                }
1493                HandshakeConfirmed => {
1494                    self.handshake_confirmed = true;
1495                    shared.handshake_confirmed.notify_waiters();
1496                }
1497                ConnectionLost { reason } => {
1498                    self.terminate(reason, shared);
1499                }
1500                Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1501                Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1502                    shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1503                }
1504                Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1505                    shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1506                }
1507                DatagramReceived => {
1508                    shared.datagram_received.notify_waiters();
1509                }
1510                DatagramsUnblocked => {
1511                    shared.datagrams_unblocked.notify_waiters();
1512                }
1513                Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1514                Stream(StreamEvent::Available { dir }) => {
1515                    // Might mean any number of streams are ready, so we wake up everyone
1516                    shared.stream_budget_available[dir as usize].notify_waiters();
1517                }
1518                Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1519                Stream(StreamEvent::Stopped { id, .. }) => {
1520                    wake_stream_notify(id, &mut self.stopped);
1521                    wake_stream(id, &mut self.blocked_writers);
1522                }
1523                Path(ref evt @ PathEvent::ObservedAddr { addr: observed, .. }) => {
1524                    self.path_events.send(evt.clone()).ok();
1525                    self.observed_external_addr.send_if_modified(|addr| {
1526                        let old = addr.replace(observed);
1527                        old != *addr
1528                    });
1529                }
1530                Path(ref evt @ PathEvent::Opened { id }) => {
1531                    self.path_events.send(evt.clone()).ok();
1532                    if let Some(sender) = self.open_path.remove(&id) {
1533                        sender.send_modify(|value| *value = Ok(()));
1534                    }
1535                }
1536                Path(ref evt @ PathEvent::Closed { id, error_code }) => {
1537                    self.path_events.send(evt.clone()).ok();
1538                    if let Some(sender) = self.close_path.remove(&id) {
1539                        let _ = sender.send(error_code);
1540                    }
1541                }
1542                Path(evt @ PathEvent::Abandoned { .. }) => {
1543                    self.path_events.send(evt).ok();
1544                }
1545                Path(ref evt @ PathEvent::LocallyClosed { id, error }) => {
1546                    self.path_events.send(evt.clone()).ok();
1547                    if let Some(sender) = self.open_path.remove(&id) {
1548                        sender.send_modify(|value| *value = Err(error));
1549                    }
1550                    // this will happen also for already opened paths
1551                }
1552                Path(evt @ PathEvent::RemoteStatus { .. }) => {
1553                    self.path_events.send(evt).ok();
1554                }
1555                NatTraversal(update) => {
1556                    self.nat_traversal_updates.send(update).ok();
1557                }
1558            }
1559        }
1560    }
1561
1562    fn drive_timer(&mut self, cx: &mut Context) -> bool {
1563        // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1564        // timer is registered with the runtime (and check whether it's already
1565        // expired).
1566        match self.inner.poll_timeout() {
1567            Some(deadline) => {
1568                if let Some(delay) = &mut self.timer {
1569                    // There is no need to reset the tokio timer if the deadline
1570                    // did not change
1571                    if self
1572                        .timer_deadline
1573                        .map(|current_deadline| current_deadline != deadline)
1574                        .unwrap_or(true)
1575                    {
1576                        delay.as_mut().reset(deadline);
1577                    }
1578                } else {
1579                    self.timer = Some(self.runtime.new_timer(deadline));
1580                }
1581                // Store the actual expiration time of the timer
1582                self.timer_deadline = Some(deadline);
1583            }
1584            None => {
1585                self.timer_deadline = None;
1586                return false;
1587            }
1588        }
1589
1590        if self.timer_deadline.is_none() {
1591            return false;
1592        }
1593
1594        let delay = self
1595            .timer
1596            .as_mut()
1597            .expect("timer must exist in this state")
1598            .as_mut();
1599        if delay.poll(cx).is_pending() {
1600            // Since there wasn't a timeout event, there is nothing new
1601            // for the connection to do
1602            return false;
1603        }
1604
1605        // A timer expired, so the caller needs to check for
1606        // new transmits, which might cause new timers to be set.
1607        self.inner.handle_timeout(self.runtime.now());
1608        self.timer_deadline = None;
1609        true
1610    }
1611
1612    /// Wake up a blocked `Driver` task to process I/O
1613    pub(crate) fn wake(&mut self) {
1614        if let Some(x) = self.driver.take() {
1615            x.wake();
1616        }
1617    }
1618
1619    /// Used to wake up all blocked futures when the connection becomes closed for any reason
1620    fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1621        self.error = Some(reason.clone());
1622        if let Some(x) = self.on_handshake_data.take() {
1623            let _ = x.send(());
1624        }
1625        wake_all(&mut self.blocked_writers);
1626        wake_all(&mut self.blocked_readers);
1627        shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1628        shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1629        shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1630        shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1631        shared.datagram_received.notify_waiters();
1632        shared.datagrams_unblocked.notify_waiters();
1633        if let Some(x) = self.on_connected.take() {
1634            let _ = x.send(false);
1635        }
1636        shared.handshake_confirmed.notify_waiters();
1637        wake_all_notify(&mut self.stopped);
1638        shared.closed.notify_waiters();
1639
1640        // Send to the registered on_closed futures.
1641        let stats = self.inner.stats();
1642        for tx in self.on_closed.drain(..) {
1643            tx.send((reason.clone(), stats.clone())).ok();
1644        }
1645    }
1646
1647    fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1648        self.inner.close(self.runtime.now(), error_code, reason);
1649        self.terminate(ConnectionError::LocallyClosed, shared);
1650        self.wake();
1651    }
1652
1653    /// Close for a reason other than the application's explicit request
1654    pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1655        self.close(0u32.into(), Bytes::new(), shared);
1656    }
1657
1658    pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1659        if self.inner.is_handshaking()
1660            || self.inner.accepted_0rtt()
1661            || self.inner.side().is_server()
1662        {
1663            Ok(())
1664        } else {
1665            Err(())
1666        }
1667    }
1668}
1669
1670impl Drop for State {
1671    fn drop(&mut self) {
1672        if !self.inner.is_drained() {
1673            // Ensure the endpoint can tidy up
1674            let _ = self
1675                .endpoint_events
1676                .send((self.handle, proto::EndpointEvent::drained()));
1677        }
1678
1679        if !self.on_closed.is_empty() {
1680            // Ensure that all on_closed oneshot senders are triggered before dropping.
1681            let reason = self.error.as_ref().expect("closed without error reason");
1682            let stats = self.inner.stats();
1683            for tx in self.on_closed.drain(..) {
1684                tx.send((reason.clone(), stats.clone())).ok();
1685            }
1686        }
1687    }
1688}
1689
1690impl fmt::Debug for State {
1691    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1692        f.debug_struct("State").field("inner", &self.inner).finish()
1693    }
1694}
1695
1696fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1697    if let Some(waker) = wakers.remove(&stream_id) {
1698        waker.wake();
1699    }
1700}
1701
1702fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1703    wakers.drain().for_each(|(_, waker)| waker.wake())
1704}
1705
1706fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1707    if let Some(notify) = wakers.remove(&stream_id) {
1708        notify.notify_waiters()
1709    }
1710}
1711
1712fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1713    wakers
1714        .drain()
1715        .for_each(|(_, notify)| notify.notify_waiters())
1716}
1717
1718/// Errors that can arise when sending a datagram
1719#[derive(Debug, Error, Clone, Eq, PartialEq)]
1720pub enum SendDatagramError {
1721    /// The peer does not support receiving datagram frames
1722    #[error("datagrams not supported by peer")]
1723    UnsupportedByPeer,
1724    /// Datagram support is disabled locally
1725    #[error("datagram support disabled")]
1726    Disabled,
1727    /// The datagram is larger than the connection can currently accommodate
1728    ///
1729    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1730    /// exceeded.
1731    #[error("datagram too large")]
1732    TooLarge,
1733    /// The connection was lost
1734    #[error("connection lost")]
1735    ConnectionLost(#[from] ConnectionError),
1736}
1737
1738/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1739///
1740/// This limits the amount of CPU resources consumed by datagram generation,
1741/// and allows other tasks (like receiving ACKs) to run in between.
1742const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1743
1744/// The maximum amount of datagrams that are sent in a single transmit
1745///
1746/// This can be lower than the maximum platform capabilities, to avoid excessive
1747/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1748/// that numbers around 10 are a good compromise.
1749const MAX_TRANSMIT_SEGMENTS: usize = 10;