iroh_quinn/
connection.rs

1use std::{
2    any::Any,
3    fmt,
4    future::Future,
5    io,
6    net::{IpAddr, SocketAddr},
7    num::NonZeroUsize,
8    pin::Pin,
9    sync::{Arc, Weak},
10    task::{Context, Poll, Waker, ready},
11};
12
13use bytes::Bytes;
14use pin_project_lite::pin_project;
15use rustc_hash::FxHashMap;
16use thiserror::Error;
17use tokio::sync::{Notify, futures::Notified, mpsc, oneshot, watch};
18use tracing::{Instrument, Span, debug_span};
19
20use crate::{
21    ConnectionEvent, Duration, Instant, Path, VarInt,
22    endpoint::ensure_ipv6,
23    mutex::Mutex,
24    path::OpenPath,
25    recv_stream::RecvStream,
26    runtime::{AsyncTimer, Runtime, UdpSender},
27    send_stream::SendStream,
28    udp_transmit,
29};
30use proto::{
31    ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent, PathError, PathEvent,
32    PathId, PathStats, PathStatus, Side, StreamEvent, StreamId, TransportError, TransportErrorCode,
33    congestion::Controller, iroh_hp,
34};
35
36/// In-progress connection attempt future
37#[derive(Debug)]
38pub struct Connecting {
39    conn: Option<ConnectionRef>,
40    connected: oneshot::Receiver<bool>,
41    handshake_data_ready: Option<oneshot::Receiver<()>>,
42}
43
44impl Connecting {
45    pub(crate) fn new(
46        handle: ConnectionHandle,
47        conn: proto::Connection,
48        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
49        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
50        sender: Pin<Box<dyn UdpSender>>,
51        runtime: Arc<dyn Runtime>,
52    ) -> Self {
53        let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
54        let (on_connected_send, on_connected_recv) = oneshot::channel();
55
56        let conn = ConnectionRef(Arc::new(ConnectionInner {
57            state: Mutex::new(State::new(
58                conn,
59                handle,
60                endpoint_events,
61                conn_events,
62                on_handshake_data_send,
63                on_connected_send,
64                sender,
65                runtime.clone(),
66            )),
67            shared: Shared::default(),
68        }));
69
70        let driver = ConnectionDriver(conn.clone());
71        runtime.spawn(Box::pin(
72            async {
73                if let Err(e) = driver.await {
74                    tracing::error!("I/O error: {e}");
75                }
76            }
77            .instrument(Span::current()),
78        ));
79
80        Self {
81            conn: Some(conn),
82            connected: on_connected_recv,
83            handshake_data_ready: Some(on_handshake_data_recv),
84        }
85    }
86
87    /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
88    ///
89    /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
90    /// If so, the returned [`Connection`] can be used to send application data without waiting for
91    /// the rest of the handshake to complete, at the cost of weakened cryptographic security
92    /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
93    /// complete, at which point subsequently opened streams and written data will have full
94    /// cryptographic protection.
95    ///
96    /// ## Outgoing
97    ///
98    /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
99    /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
100    /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
101    /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
102    /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
103    /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
104    /// If it was rejected, the existence of streams opened and other application data sent prior
105    /// to the handshake completing will not be conveyed to the remote application, and local
106    /// operations on them will return `ZeroRttRejected` errors.
107    ///
108    /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
109    /// relevant resumption state to be stored in the server, which servers may limit or lose for
110    /// various reasons including not persisting resumption state across server restarts.
111    ///
112    /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
113    /// implementation's docs for 0-RTT pitfalls.
114    ///
115    /// ## Incoming
116    ///
117    /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
118    /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
119    ///
120    /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
121    /// implementation's docs for 0-RTT pitfalls.
122    ///
123    /// ## Security
124    ///
125    /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
126    /// replay attacks, and should therefore never invoke non-idempotent operations.
127    ///
128    /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
129    /// before TLS client authentication has occurred, and should therefore not be used to send
130    /// data for which client authentication is being used.
131    pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
132        // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
133        // have to release it explicitly before returning `self` by value.
134        let conn = (self.conn.as_mut().unwrap()).state.lock("into_0rtt");
135
136        let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
137        drop(conn);
138
139        if is_ok {
140            let conn = self.conn.take().unwrap();
141            Ok((Connection(conn), ZeroRttAccepted(self.connected)))
142        } else {
143            Err(self)
144        }
145    }
146
147    /// Parameters negotiated during the handshake
148    ///
149    /// The dynamic type returned is determined by the configured
150    /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
151    /// be [`downcast`](Box::downcast) to a
152    /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
153    pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
154        // Taking &mut self allows us to use a single oneshot channel rather than dealing with
155        // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
156        // simple.
157        if let Some(x) = self.handshake_data_ready.take() {
158            let _ = x.await;
159        }
160        let conn = self.conn.as_ref().unwrap();
161        let inner = conn.state.lock("handshake");
162        inner
163            .inner
164            .crypto_session()
165            .handshake_data()
166            .ok_or_else(|| {
167                inner
168                    .error
169                    .clone()
170                    .expect("spurious handshake data ready notification")
171            })
172    }
173
174    /// The local IP address which was used when the peer established
175    /// the connection
176    ///
177    /// This can be different from the address the endpoint is bound to, in case
178    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
179    ///
180    /// This will return `None` for clients, or when the platform does not expose this
181    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
182    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
183    ///
184    /// Will panic if called after `poll` has returned `Ready`.
185    pub fn local_ip(&self) -> Option<IpAddr> {
186        let conn = self.conn.as_ref().unwrap();
187        let inner = conn.state.lock("local_ip");
188
189        inner.inner.local_ip()
190    }
191
192    /// The peer's UDP address
193    ///
194    /// Will panic if called after `poll` has returned `Ready`.
195    pub fn remote_address(&self) -> SocketAddr {
196        let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready");
197        // TODO: another unwrap
198        conn_ref
199            .state
200            .lock("remote_address")
201            .inner
202            .path_remote_address(PathId::ZERO)
203            .expect("path exists when connecting")
204    }
205}
206
207impl Future for Connecting {
208    type Output = Result<Connection, ConnectionError>;
209    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
210        Pin::new(&mut self.connected).poll(cx).map(|_| {
211            let conn = self.conn.take().unwrap();
212            let inner = conn.state.lock("connecting");
213            if inner.connected {
214                drop(inner);
215                Ok(Connection(conn))
216            } else {
217                Err(inner
218                    .error
219                    .clone()
220                    .expect("connected signaled without connection success or error"))
221            }
222        })
223    }
224}
225
226/// Future that completes when a connection is fully established
227///
228/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
229/// value is meaningless.
230pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
231
232impl Future for ZeroRttAccepted {
233    type Output = bool;
234    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
235        Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
236    }
237}
238
239/// A future that drives protocol logic for a connection
240///
241/// This future handles the protocol logic for a single connection, routing events from the
242/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
243/// It also keeps track of outstanding timeouts for the `Connection`.
244///
245/// If the connection encounters an error condition, this future will yield an error. It will
246/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
247/// connection-related futures, this waits for the draining period to complete to ensure that
248/// packets still in flight from the peer are handled gracefully.
249#[must_use = "connection drivers must be spawned for their connections to function"]
250#[derive(Debug)]
251struct ConnectionDriver(ConnectionRef);
252
253impl Future for ConnectionDriver {
254    type Output = Result<(), io::Error>;
255
256    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
257        let conn = &mut *self.0.state.lock("poll");
258
259        let span = debug_span!("drive", id = conn.handle.0);
260        let _guard = span.enter();
261
262        if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
263            conn.terminate(e, &self.0.shared);
264            return Poll::Ready(Ok(()));
265        }
266        let mut keep_going = conn.drive_transmit(cx)?;
267        // If a timer expires, there might be more to transmit. When we transmit something, we
268        // might need to reset a timer. Hence, we must loop until neither happens.
269        keep_going |= conn.drive_timer(cx);
270        conn.forward_endpoint_events();
271        conn.forward_app_events(&self.0.shared);
272
273        if !conn.inner.is_drained() {
274            if keep_going {
275                // If the connection hasn't processed all tasks, schedule it again
276                cx.waker().wake_by_ref();
277            } else {
278                conn.driver = Some(cx.waker().clone());
279            }
280            return Poll::Pending;
281        }
282        if conn.error.is_none() {
283            unreachable!("drained connections always have an error");
284        }
285        Poll::Ready(Ok(()))
286    }
287}
288
289/// A QUIC connection.
290///
291/// If all references to a connection (including every clone of the `Connection` handle, streams of
292/// incoming streams, and the various stream types) have been dropped, then the connection will be
293/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
294/// connection explicitly by calling [`Connection::close()`].
295///
296/// Closing the connection immediately abandons efforts to deliver data to the peer.  Upon
297/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
298/// application. [`Connection::close()`] describes in more detail how to gracefully close a
299/// connection without losing application data.
300///
301/// May be cloned to obtain another handle to the same connection.
302///
303/// [`Connection::close()`]: Connection::close
304#[derive(Debug, Clone)]
305pub struct Connection(ConnectionRef);
306
307impl Connection {
308    /// Returns a weak reference to the inner connection struct.
309    pub fn weak_handle(&self) -> WeakConnectionHandle {
310        WeakConnectionHandle(Arc::downgrade(&self.0.0))
311    }
312
313    /// Initiate a new outgoing unidirectional stream.
314    ///
315    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
316    /// consequence, the peer won't be notified that a stream has been opened until the stream is
317    /// actually used.
318    pub fn open_uni(&self) -> OpenUni<'_> {
319        OpenUni {
320            conn: &self.0,
321            notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
322        }
323    }
324
325    /// Initiate a new outgoing bidirectional stream.
326    ///
327    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
328    /// consequence, the peer won't be notified that a stream has been opened until the stream is
329    /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
330    /// anything to [`SendStream`] will never succeed.
331    ///
332    /// [`open_bi()`]: crate::Connection::open_bi
333    /// [`SendStream`]: crate::SendStream
334    /// [`RecvStream`]: crate::RecvStream
335    pub fn open_bi(&self) -> OpenBi<'_> {
336        OpenBi {
337            conn: &self.0,
338            notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
339        }
340    }
341
342    /// Accept the next incoming uni-directional stream
343    pub fn accept_uni(&self) -> AcceptUni<'_> {
344        AcceptUni {
345            conn: &self.0,
346            notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
347        }
348    }
349
350    /// Accept the next incoming bidirectional stream
351    ///
352    /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
353    /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
354    /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
355    ///
356    /// [`accept_bi()`]: crate::Connection::accept_bi
357    /// [`open_bi()`]: crate::Connection::open_bi
358    /// [`SendStream`]: crate::SendStream
359    /// [`RecvStream`]: crate::RecvStream
360    pub fn accept_bi(&self) -> AcceptBi<'_> {
361        AcceptBi {
362            conn: &self.0,
363            notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
364        }
365    }
366
367    /// Receive an application datagram
368    pub fn read_datagram(&self) -> ReadDatagram<'_> {
369        ReadDatagram {
370            conn: &self.0,
371            notify: self.0.shared.datagram_received.notified(),
372        }
373    }
374
375    /// Opens a new path if no path exists yet for the remote address.
376    ///
377    /// Otherwise behaves exactly as [`open_path`].
378    ///
379    /// [`open_path`]: Self::open_path
380    pub fn open_path_ensure(&self, addr: SocketAddr, initial_status: PathStatus) -> OpenPath {
381        let mut state = self.0.state.lock("open_path");
382
383        // If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6.
384        // If not, we do not support IPv6.  We can not access endpoint::State from here
385        // however, but either all our paths use an IPv6 address, or all our paths use an
386        // IPv4 address.  So we can use that information.
387        let ipv6 = state
388            .inner
389            .paths()
390            .iter()
391            .filter_map(|id| {
392                state
393                    .inner
394                    .path_remote_address(*id)
395                    .map(|ip| ip.is_ipv6())
396                    .ok()
397            })
398            .next()
399            .unwrap_or_default();
400        if addr.is_ipv6() && !ipv6 {
401            return OpenPath::rejected(PathError::InvalidRemoteAddress(addr));
402        }
403        let addr = if ipv6 {
404            SocketAddr::V6(ensure_ipv6(addr))
405        } else {
406            addr
407        };
408
409        let now = state.runtime.now();
410        let open_res = state.inner.open_path_ensure(addr, initial_status, now);
411        state.wake();
412        match open_res {
413            Ok((path_id, existed)) if existed => {
414                match state.open_path.get(&path_id).map(|tx| tx.subscribe()) {
415                    Some(recv) => OpenPath::new(path_id, recv, self.0.clone()),
416                    None => OpenPath::ready(path_id, self.0.clone()),
417                }
418            }
419            Ok((path_id, _)) => {
420                let (tx, rx) = watch::channel(Ok(()));
421                state.open_path.insert(path_id, tx);
422                drop(state);
423                OpenPath::new(path_id, rx, self.0.clone())
424            }
425            Err(err) => OpenPath::rejected(err),
426        }
427    }
428
429    /// Opens an additional path if the multipath extension is negotiated.
430    ///
431    /// The returned future completes once the path is either fully opened and ready to
432    /// carry application data, or if there was an error.
433    ///
434    /// Dropping the returned future does not cancel the opening of the path, the
435    /// [`PathEvent::Opened`] event will still be emitted from [`Self::path_events`] if the
436    /// path opens.  The [`PathId`] for the events can be extracted from
437    /// [`OpenPath::path_id`].
438    ///
439    /// Failure to open a path can either occur immediately, before polling the returned
440    /// future, or at a later time.  If the failure is immediate [`OpenPath::path_id`] will
441    /// return `None` and the future will be ready immediately.  If the failure happens
442    /// later, a [`PathEvent`] will be emitted.
443    pub fn open_path(&self, addr: SocketAddr, initial_status: PathStatus) -> OpenPath {
444        let mut state = self.0.state.lock("open_path");
445
446        // If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6.
447        // If not, we do not support IPv6.  We can not access endpoint::State from here
448        // however, but either all our paths use an IPv6 address, or all our paths use an
449        // IPv4 address.  So we can use that information.
450        let ipv6 = state
451            .inner
452            .paths()
453            .iter()
454            .filter_map(|id| {
455                state
456                    .inner
457                    .path_remote_address(*id)
458                    .map(|ip| ip.is_ipv6())
459                    .ok()
460            })
461            .next()
462            .unwrap_or_default();
463        if addr.is_ipv6() && !ipv6 {
464            return OpenPath::rejected(PathError::InvalidRemoteAddress(addr));
465        }
466        let addr = if ipv6 {
467            SocketAddr::V6(ensure_ipv6(addr))
468        } else {
469            addr
470        };
471
472        let (on_open_path_send, on_open_path_recv) = watch::channel(Ok(()));
473        let now = state.runtime.now();
474        let open_res = state.inner.open_path(addr, initial_status, now);
475        state.wake();
476        match open_res {
477            Ok(path_id) => {
478                state.open_path.insert(path_id, on_open_path_send);
479                drop(state);
480                OpenPath::new(path_id, on_open_path_recv, self.0.clone())
481            }
482            Err(err) => OpenPath::rejected(err),
483        }
484    }
485
486    /// Returns the [`Path`] structure of an open path
487    pub fn path(&self, id: PathId) -> Option<Path> {
488        // TODO(flub): Using this to know if the path still exists is... hacky.
489        self.0.state.lock("path").inner.path_status(id).ok()?;
490        Some(Path {
491            id,
492            conn: self.0.clone(),
493        })
494    }
495
496    /// A broadcast receiver of [`PathEvent`]s for all paths in this connection
497    pub fn path_events(&self) -> tokio::sync::broadcast::Receiver<PathEvent> {
498        self.0.state.lock("path_events").path_events.subscribe()
499    }
500
501    /// A broadcast receiver of [`iroh_hp::Event`]s for updates about server addresses
502    pub fn nat_traversal_updates(&self) -> tokio::sync::broadcast::Receiver<iroh_hp::Event> {
503        self.0
504            .state
505            .lock("nat_traversal_updates")
506            .nat_traversal_updates
507            .subscribe()
508    }
509
510    /// Wait for the connection to be closed for any reason
511    ///
512    /// Despite the return type's name, closed connections are often not an error condition at the
513    /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
514    /// and [`ConnectionError::ApplicationClosed`].
515    pub async fn closed(&self) -> ConnectionError {
516        {
517            let conn = self.0.state.lock("closed");
518            if let Some(error) = conn.error.as_ref() {
519                return error.clone();
520            }
521            // Construct the future while the lock is held to ensure we can't miss a wakeup if
522            // the `Notify` is signaled immediately after we release the lock. `await` it after
523            // the lock guard is out of scope.
524            self.0.shared.closed.notified()
525        }
526        .await;
527        self.0
528            .state
529            .lock("closed")
530            .error
531            .as_ref()
532            .expect("closed without an error")
533            .clone()
534    }
535
536    /// Wait for the connection to be closed without keeping a strong reference to the connection
537    ///
538    /// Returns a future that resolves, once the connection is closed, to a tuple of
539    /// ([`ConnectionError`], [`ConnectionStats`]).
540    ///
541    /// Calling [`Self::closed`] keeps the connection alive until it is either closed locally via [`Connection::close`]
542    /// or closed by the remote peer. This function instead does not keep the connection itself alive,
543    /// so if all *other* clones of the connection are dropped, the connection will be closed implicitly even
544    /// if there are futures returned from this function still being awaited.
545    pub fn on_closed(&self) -> OnClosed {
546        let (tx, rx) = oneshot::channel();
547        self.0.state.lock("on_closed").on_closed.push(tx);
548        OnClosed {
549            conn: self.weak_handle(),
550            rx,
551        }
552    }
553
554    /// If the connection is closed, the reason why.
555    ///
556    /// Returns `None` if the connection is still open.
557    pub fn close_reason(&self) -> Option<ConnectionError> {
558        self.0.state.lock("close_reason").error.clone()
559    }
560
561    /// Close the connection immediately.
562    ///
563    /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
564    /// more data is sent to the peer and the peer may drop buffered data upon receiving
565    /// the CONNECTION_CLOSE frame.
566    ///
567    /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
568    ///
569    /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
570    /// is preserved in full, it should be kept under 1KiB.
571    ///
572    /// # Gracefully closing a connection
573    ///
574    /// Only the peer last receiving application data can be certain that all data is
575    /// delivered. The only reliable action it can then take is to close the connection,
576    /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
577    /// frame is very likely if both endpoints stay online long enough, and
578    /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
579    /// remote peer will time out the connection, provided that the idle timeout is not
580    /// disabled.
581    ///
582    /// The sending side can not guarantee all stream data is delivered to the remote
583    /// application. It only knows the data is delivered to the QUIC stack of the remote
584    /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
585    /// [`close()`] the remote endpoint may drop any data it received but is as yet
586    /// undelivered to the application, including data that was acknowledged as received to
587    /// the local endpoint.
588    ///
589    /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
590    /// [`Endpoint::wait_idle()`]: crate::Endpoint::wait_idle
591    /// [`close()`]: Connection::close
592    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
593        let conn = &mut *self.0.state.lock("close");
594        conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
595    }
596
597    /// Wait for the handshake to be confirmed.
598    ///
599    /// As a server, who must be authenticated by clients,
600    /// this happens when the handshake completes
601    /// upon receiving a TLS Finished message from the client.
602    /// In return, the server send a HANDSHAKE_DONE frame.
603    ///
604    /// As a client, this happens when receiving a HANDSHAKE_DONE frame.
605    /// At this point, the server has either accepted our authentication,
606    /// or, if client authentication is not required, accepted our lack of authentication.
607    pub async fn handshake_confirmed(&self) -> Result<(), ConnectionError> {
608        {
609            let conn = self.0.state.lock("handshake_confirmed");
610            if let Some(error) = conn.error.as_ref() {
611                return Err(error.clone());
612            }
613            if conn.handshake_confirmed {
614                return Ok(());
615            }
616            // Construct the future while the lock is held to ensure we can't miss a wakeup if
617            // the `Notify` is signaled immediately after we release the lock. `await` it after
618            // the lock guard is out of scope.
619            self.0.shared.handshake_confirmed.notified()
620        }
621        .await;
622        if let Some(error) = self.0.state.lock("handshake_confirmed").error.as_ref() {
623            Err(error.clone())
624        } else {
625            Ok(())
626        }
627    }
628
629    /// Transmit `data` as an unreliable, unordered application datagram
630    ///
631    /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
632    /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
633    /// dictated by the peer.
634    ///
635    /// Previously queued datagrams which are still unsent may be discarded to make space for this
636    /// datagram, in order of oldest to newest.
637    pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
638        let conn = &mut *self.0.state.lock("send_datagram");
639        if let Some(ref x) = conn.error {
640            return Err(SendDatagramError::ConnectionLost(x.clone()));
641        }
642        use proto::SendDatagramError::*;
643        match conn.inner.datagrams().send(data, true) {
644            Ok(()) => {
645                conn.wake();
646                Ok(())
647            }
648            Err(e) => Err(match e {
649                Blocked(..) => unreachable!(),
650                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
651                Disabled => SendDatagramError::Disabled,
652                TooLarge => SendDatagramError::TooLarge,
653            }),
654        }
655    }
656
657    /// Transmit `data` as an unreliable, unordered application datagram
658    ///
659    /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
660    /// conditions, which effectively prioritizes old datagrams over new datagrams.
661    ///
662    /// See [`send_datagram()`] for details.
663    ///
664    /// [`send_datagram()`]: Connection::send_datagram
665    pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
666        SendDatagram {
667            conn: &self.0,
668            data: Some(data),
669            notify: self.0.shared.datagrams_unblocked.notified(),
670        }
671    }
672
673    /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
674    ///
675    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
676    ///
677    /// This may change over the lifetime of a connection according to variation in the path MTU
678    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
679    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
680    ///
681    /// Not necessarily the maximum size of received datagrams.
682    ///
683    /// [`send_datagram()`]: Connection::send_datagram
684    pub fn max_datagram_size(&self) -> Option<usize> {
685        self.0
686            .state
687            .lock("max_datagram_size")
688            .inner
689            .datagrams()
690            .max_size()
691    }
692
693    /// Bytes available in the outgoing datagram buffer
694    ///
695    /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
696    /// at most this size is guaranteed not to cause older datagrams to be dropped.
697    pub fn datagram_send_buffer_space(&self) -> usize {
698        self.0
699            .state
700            .lock("datagram_send_buffer_space")
701            .inner
702            .datagrams()
703            .send_buffer_space()
704    }
705
706    /// The side of the connection (client or server)
707    pub fn side(&self) -> Side {
708        self.0.state.lock("side").inner.side()
709    }
710
711    /// The peer's UDP address
712    ///
713    /// If [`ServerConfig::migration`] is `true`, clients may change addresses at will,
714    /// e.g. when switching to a cellular internet connection.
715    ///
716    /// If [`multipath`] is enabled this will return the address of *any*
717    /// path, and may not be consistent. Prefer [`Path::remote_address`] instead.
718    ///
719    /// [`ServerConfig::migration`]: crate::ServerConfig::migration
720    /// [`multipath`]: crate::TransportConfig::max_concurrent_multipath_paths
721    pub fn remote_address(&self) -> SocketAddr {
722        // TODO: an unwrap again
723        let state = self.0.state.lock("remote_address");
724        state
725            .inner
726            .paths()
727            .iter()
728            .filter_map(|id| state.inner.path_remote_address(*id).ok())
729            .next()
730            .unwrap()
731    }
732
733    /// The local IP address which was used when the peer established
734    /// the connection
735    ///
736    /// This can be different from the address the endpoint is bound to, in case
737    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
738    ///
739    /// This will return `None` for clients, or when the platform does not expose this
740    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
741    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
742    pub fn local_ip(&self) -> Option<IpAddr> {
743        self.0.state.lock("local_ip").inner.local_ip()
744    }
745
746    /// Current best estimate of this connection's latency (round-trip-time)
747    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
748        self.0.state.lock("rtt").inner.rtt(path_id)
749    }
750
751    /// Returns connection statistics
752    pub fn stats(&self) -> ConnectionStats {
753        self.0.state.lock("stats").inner.stats()
754    }
755
756    /// Returns path statistics
757    pub fn path_stats(&self, path_id: PathId) -> Option<PathStats> {
758        self.0.state.lock("path_stats").inner.path_stats(path_id)
759    }
760
761    /// Current state of the congestion control algorithm, for debugging purposes
762    pub fn congestion_state(&self, path_id: PathId) -> Option<Box<dyn Controller>> {
763        self.0
764            .state
765            .lock("congestion_state")
766            .inner
767            .congestion_state(path_id)
768            .map(|c| c.clone_box())
769    }
770
771    /// Parameters negotiated during the handshake
772    ///
773    /// Guaranteed to return `Some` on fully established connections or after
774    /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
775    /// the returned value.
776    ///
777    /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
778    pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
779        self.0
780            .state
781            .lock("handshake_data")
782            .inner
783            .crypto_session()
784            .handshake_data()
785    }
786
787    /// Cryptographic identity of the peer
788    ///
789    /// The dynamic type returned is determined by the configured
790    /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
791    /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
792    pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
793        self.0
794            .state
795            .lock("peer_identity")
796            .inner
797            .crypto_session()
798            .peer_identity()
799    }
800
801    /// A stable identifier for this connection
802    ///
803    /// Peer addresses and connection IDs can change, but this value will remain
804    /// fixed for the lifetime of the connection.
805    pub fn stable_id(&self) -> usize {
806        self.0.stable_id()
807    }
808
809    /// Update traffic keys spontaneously
810    ///
811    /// This primarily exists for testing purposes.
812    pub fn force_key_update(&self) {
813        self.0
814            .state
815            .lock("force_key_update")
816            .inner
817            .force_key_update()
818    }
819
820    /// Derive keying material from this connection's TLS session secrets.
821    ///
822    /// When both peers call this method with the same `label` and `context`
823    /// arguments and `output` buffers of equal length, they will get the
824    /// same sequence of bytes in `output`. These bytes are cryptographically
825    /// strong and pseudorandom, and are suitable for use as keying material.
826    ///
827    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
828    pub fn export_keying_material(
829        &self,
830        output: &mut [u8],
831        label: &[u8],
832        context: &[u8],
833    ) -> Result<(), proto::crypto::ExportKeyingMaterialError> {
834        self.0
835            .state
836            .lock("export_keying_material")
837            .inner
838            .crypto_session()
839            .export_keying_material(output, label, context)
840    }
841
842    /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
843    ///
844    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
845    /// `count`s increase both minimum and worst-case memory consumption.
846    pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
847        let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
848        conn.inner.set_max_concurrent_streams(Dir::Uni, count);
849        // May need to send MAX_STREAMS to make progress
850        conn.wake();
851    }
852
853    /// See [`proto::TransportConfig::send_window()`]
854    pub fn set_send_window(&self, send_window: u64) {
855        let mut conn = self.0.state.lock("set_send_window");
856        conn.inner.set_send_window(send_window);
857        conn.wake();
858    }
859
860    /// See [`proto::TransportConfig::receive_window()`]
861    pub fn set_receive_window(&self, receive_window: VarInt) {
862        let mut conn = self.0.state.lock("set_receive_window");
863        conn.inner.set_receive_window(receive_window);
864        conn.wake();
865    }
866
867    /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
868    ///
869    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
870    /// `count`s increase both minimum and worst-case memory consumption.
871    pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
872        let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
873        conn.inner.set_max_concurrent_streams(Dir::Bi, count);
874        // May need to send MAX_STREAMS to make progress
875        conn.wake();
876    }
877
878    /// Track changed on our external address as reported by the peer.
879    pub fn observed_external_addr(&self) -> watch::Receiver<Option<SocketAddr>> {
880        let conn = self.0.state.lock("external_addr");
881        conn.observed_external_addr.subscribe()
882    }
883
884    /// Is multipath enabled?
885    // TODO(flub): not a useful API, once we do real things with multipath we can remove
886    // this again.
887    pub fn is_multipath_enabled(&self) -> bool {
888        let conn = self.0.state.lock("is_multipath_enabled");
889        conn.inner.is_multipath_negotiated()
890    }
891
892    /// Registers one address at which this endpoint might be reachable
893    ///
894    /// When the NAT traversal extension is negotiated, servers send these addresses to clients in
895    /// `ADD_ADDRESS` frames. This allows clients to obtain server address candidates to initiate
896    /// NAT traversal attempts. Clients provide their own reachable addresses in `REACH_OUT` frames
897    /// when [`Self::initiate_nat_traversal_round`] is called.
898    pub fn add_nat_traversal_address(&self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
899        let mut conn = self.0.state.lock("add_nat_traversal_addresses");
900        conn.inner.add_nat_traversal_address(address)
901    }
902
903    /// Removes one or more addresses from the set of addresses at which this endpoint is reachable
904    ///
905    /// When the NAT traversal extension is negotiated, servers send address removals to
906    /// clients in `REMOVE_ADDRESS` frames. This allows clients to stop using outdated
907    /// server address candidates that are no longer valid for NAT traversal.
908    ///
909    /// For clients, removed addresses will no longer be advertised in `REACH_OUT` frames.
910    ///
911    /// Addresses not present in the set will be silently ignored.
912    pub fn remove_nat_traversal_address(&self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
913        let mut conn = self.0.state.lock("remove_nat_traversal_addresses");
914        conn.inner.remove_nat_traversal_address(address)
915    }
916
917    /// Get the current local nat traversal addresses
918    pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
919        let conn = self.0.state.lock("get_local_nat_traversal_addresses");
920        conn.inner.get_local_nat_traversal_addresses()
921    }
922
923    /// Get the currently advertised nat traversal addresses by the server
924    pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
925        let conn = self.0.state.lock("get_remote_nat_traversal_addresses");
926        conn.inner.get_remote_nat_traversal_addresses()
927    }
928
929    /// Initiates a new nat traversal round
930    ///
931    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
932    /// frames, and initiating probing of the known remote addresses. When a new round is
933    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
934    ///
935    /// Returns the server addresses that are now being probed.
936    pub fn initiate_nat_traversal_round(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
937        let mut conn = self.0.state.lock("initiate_nat_traversal_round");
938        let now = conn.runtime.now();
939        conn.inner.initiate_nat_traversal_round(now)
940    }
941}
942
943pin_project! {
944    /// Future produced by [`Connection::open_uni`]
945    pub struct OpenUni<'a> {
946        conn: &'a ConnectionRef,
947        #[pin]
948        notify: Notified<'a>,
949    }
950}
951
952impl Future for OpenUni<'_> {
953    type Output = Result<SendStream, ConnectionError>;
954    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
955        let this = self.project();
956        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
957        Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
958    }
959}
960
961pin_project! {
962    /// Future produced by [`Connection::open_bi`]
963    pub struct OpenBi<'a> {
964        conn: &'a ConnectionRef,
965        #[pin]
966        notify: Notified<'a>,
967    }
968}
969
970impl Future for OpenBi<'_> {
971    type Output = Result<(SendStream, RecvStream), ConnectionError>;
972    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
973        let this = self.project();
974        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
975
976        Poll::Ready(Ok((
977            SendStream::new(conn.clone(), id, is_0rtt),
978            RecvStream::new(conn, id, is_0rtt),
979        )))
980    }
981}
982
983fn poll_open<'a>(
984    ctx: &mut Context<'_>,
985    conn: &'a ConnectionRef,
986    mut notify: Pin<&mut Notified<'a>>,
987    dir: Dir,
988) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
989    let mut state = conn.state.lock("poll_open");
990    if let Some(ref e) = state.error {
991        return Poll::Ready(Err(e.clone()));
992    } else if let Some(id) = state.inner.streams().open(dir) {
993        let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
994        drop(state); // Release the lock so clone can take it
995        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
996    }
997    loop {
998        match notify.as_mut().poll(ctx) {
999            // `state` lock ensures we didn't race with readiness
1000            Poll::Pending => return Poll::Pending,
1001            // Spurious wakeup, get a new future
1002            Poll::Ready(()) => {
1003                notify.set(conn.shared.stream_budget_available[dir as usize].notified())
1004            }
1005        }
1006    }
1007}
1008
1009pin_project! {
1010    /// Future produced by [`Connection::accept_uni`]
1011    pub struct AcceptUni<'a> {
1012        conn: &'a ConnectionRef,
1013        #[pin]
1014        notify: Notified<'a>,
1015    }
1016}
1017
1018impl Future for AcceptUni<'_> {
1019    type Output = Result<RecvStream, ConnectionError>;
1020
1021    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1022        let this = self.project();
1023        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
1024        Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
1025    }
1026}
1027
1028pin_project! {
1029    /// Future produced by [`Connection::accept_bi`]
1030    pub struct AcceptBi<'a> {
1031        conn: &'a ConnectionRef,
1032        #[pin]
1033        notify: Notified<'a>,
1034    }
1035}
1036
1037impl Future for AcceptBi<'_> {
1038    type Output = Result<(SendStream, RecvStream), ConnectionError>;
1039
1040    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1041        let this = self.project();
1042        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
1043        Poll::Ready(Ok((
1044            SendStream::new(conn.clone(), id, is_0rtt),
1045            RecvStream::new(conn, id, is_0rtt),
1046        )))
1047    }
1048}
1049
1050fn poll_accept<'a>(
1051    ctx: &mut Context<'_>,
1052    conn: &'a ConnectionRef,
1053    mut notify: Pin<&mut Notified<'a>>,
1054    dir: Dir,
1055) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1056    let mut state = conn.state.lock("poll_accept");
1057    // Check for incoming streams before checking `state.error` so that already-received streams,
1058    // which are necessarily finite, can be drained from a closed connection.
1059    if let Some(id) = state.inner.streams().accept(dir) {
1060        let is_0rtt = state.inner.is_handshaking();
1061        state.wake(); // To send additional stream ID credit
1062        drop(state); // Release the lock so clone can take it
1063        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1064    } else if let Some(ref e) = state.error {
1065        return Poll::Ready(Err(e.clone()));
1066    }
1067    loop {
1068        match notify.as_mut().poll(ctx) {
1069            // `state` lock ensures we didn't race with readiness
1070            Poll::Pending => return Poll::Pending,
1071            // Spurious wakeup, get a new future
1072            Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
1073        }
1074    }
1075}
1076
1077pin_project! {
1078    /// Future produced by [`Connection::read_datagram`]
1079    pub struct ReadDatagram<'a> {
1080        conn: &'a ConnectionRef,
1081        #[pin]
1082        notify: Notified<'a>,
1083    }
1084}
1085
1086impl Future for ReadDatagram<'_> {
1087    type Output = Result<Bytes, ConnectionError>;
1088    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1089        let mut this = self.project();
1090        let mut state = this.conn.state.lock("ReadDatagram::poll");
1091        // Check for buffered datagrams before checking `state.error` so that already-received
1092        // datagrams, which are necessarily finite, can be drained from a closed connection.
1093        if let Some(x) = state.inner.datagrams().recv() {
1094            return Poll::Ready(Ok(x));
1095        } else if let Some(ref e) = state.error {
1096            return Poll::Ready(Err(e.clone()));
1097        }
1098        loop {
1099            match this.notify.as_mut().poll(ctx) {
1100                // `state` lock ensures we didn't race with readiness
1101                Poll::Pending => return Poll::Pending,
1102                // Spurious wakeup, get a new future
1103                Poll::Ready(()) => this
1104                    .notify
1105                    .set(this.conn.shared.datagram_received.notified()),
1106            }
1107        }
1108    }
1109}
1110
1111pin_project! {
1112    /// Future produced by [`Connection::send_datagram_wait`]
1113    pub struct SendDatagram<'a> {
1114        conn: &'a ConnectionRef,
1115        data: Option<Bytes>,
1116        #[pin]
1117        notify: Notified<'a>,
1118    }
1119}
1120
1121impl Future for SendDatagram<'_> {
1122    type Output = Result<(), SendDatagramError>;
1123    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1124        let mut this = self.project();
1125        let mut state = this.conn.state.lock("SendDatagram::poll");
1126        if let Some(ref e) = state.error {
1127            return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
1128        }
1129        use proto::SendDatagramError::*;
1130        match state
1131            .inner
1132            .datagrams()
1133            .send(this.data.take().unwrap(), false)
1134        {
1135            Ok(()) => {
1136                state.wake();
1137                Poll::Ready(Ok(()))
1138            }
1139            Err(e) => Poll::Ready(Err(match e {
1140                Blocked(data) => {
1141                    this.data.replace(data);
1142                    loop {
1143                        match this.notify.as_mut().poll(ctx) {
1144                            Poll::Pending => return Poll::Pending,
1145                            // Spurious wakeup, get a new future
1146                            Poll::Ready(()) => this
1147                                .notify
1148                                .set(this.conn.shared.datagrams_unblocked.notified()),
1149                        }
1150                    }
1151                }
1152                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1153                Disabled => SendDatagramError::Disabled,
1154                TooLarge => SendDatagramError::TooLarge,
1155            })),
1156        }
1157    }
1158}
1159
1160/// Future returned by [`Connection::on_closed`]
1161///
1162/// Resolves to a tuple of ([`ConnectionError`], [`ConnectionStats`]).
1163pub struct OnClosed {
1164    rx: oneshot::Receiver<(ConnectionError, ConnectionStats)>,
1165    conn: WeakConnectionHandle,
1166}
1167
1168impl Drop for OnClosed {
1169    fn drop(&mut self) {
1170        if self.rx.is_terminated() {
1171            return;
1172        };
1173        if let Some(conn) = self.conn.upgrade() {
1174            self.rx.close();
1175            conn.0
1176                .state
1177                .lock("OnClosed::drop")
1178                .on_closed
1179                .retain(|tx| !tx.is_closed());
1180        }
1181    }
1182}
1183
1184impl Future for OnClosed {
1185    type Output = (ConnectionError, ConnectionStats);
1186
1187    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1188        let this = self.get_mut();
1189        // The `expect` is safe because `State::drop` ensures that all senders are triggered
1190        // before being dropped.
1191        Pin::new(&mut this.rx)
1192            .poll(cx)
1193            .map(|x| x.expect("on_close sender is never dropped before sending"))
1194    }
1195}
1196
1197#[derive(Debug)]
1198pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
1199
1200impl ConnectionRef {
1201    fn from_arc(inner: Arc<ConnectionInner>) -> Self {
1202        inner.state.lock("from_arc").ref_count += 1;
1203        Self(inner)
1204    }
1205
1206    fn stable_id(&self) -> usize {
1207        &*self.0 as *const _ as usize
1208    }
1209}
1210
1211impl Clone for ConnectionRef {
1212    fn clone(&self) -> Self {
1213        Self::from_arc(Arc::clone(&self.0))
1214    }
1215}
1216
1217impl Drop for ConnectionRef {
1218    fn drop(&mut self) {
1219        let conn = &mut *self.state.lock("drop");
1220        if let Some(x) = conn.ref_count.checked_sub(1) {
1221            conn.ref_count = x;
1222            if x == 0 && !conn.inner.is_closed() {
1223                // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
1224                // not, we can't do any harm. If there were any streams being opened, then either
1225                // the connection will be closed for an unrelated reason or a fresh reference will
1226                // be constructed for the newly opened stream.
1227                conn.implicit_close(&self.shared);
1228            }
1229        }
1230    }
1231}
1232
1233impl std::ops::Deref for ConnectionRef {
1234    type Target = ConnectionInner;
1235    fn deref(&self) -> &Self::Target {
1236        &self.0
1237    }
1238}
1239
1240#[derive(Debug)]
1241pub(crate) struct ConnectionInner {
1242    pub(crate) state: Mutex<State>,
1243    pub(crate) shared: Shared,
1244}
1245
1246/// A handle to some connection internals, use with care.
1247///
1248/// This contains a weak reference to the connection so will not itself keep the connection
1249/// alive.
1250#[derive(Debug, Clone)]
1251pub struct WeakConnectionHandle(Weak<ConnectionInner>);
1252
1253impl WeakConnectionHandle {
1254    /// Returns `true` if the [`Connection`] associated with this handle is still alive.
1255    pub fn is_alive(&self) -> bool {
1256        self.0.upgrade().is_some()
1257    }
1258
1259    /// Upgrade the handle to a full `Connection`
1260    pub fn upgrade(&self) -> Option<Connection> {
1261        self.0
1262            .upgrade()
1263            .map(|inner| Connection(ConnectionRef::from_arc(inner)))
1264    }
1265}
1266
1267#[derive(Debug, Default)]
1268pub(crate) struct Shared {
1269    handshake_confirmed: Notify,
1270    /// Notified when new streams may be locally initiated due to an increase in stream ID flow
1271    /// control budget
1272    stream_budget_available: [Notify; 2],
1273    /// Notified when the peer has initiated a new stream
1274    stream_incoming: [Notify; 2],
1275    datagram_received: Notify,
1276    datagrams_unblocked: Notify,
1277    closed: Notify,
1278}
1279
1280pub(crate) struct State {
1281    pub(crate) inner: proto::Connection,
1282    driver: Option<Waker>,
1283    handle: ConnectionHandle,
1284    on_handshake_data: Option<oneshot::Sender<()>>,
1285    on_connected: Option<oneshot::Sender<bool>>,
1286    connected: bool,
1287    handshake_confirmed: bool,
1288    timer: Option<Pin<Box<dyn AsyncTimer>>>,
1289    timer_deadline: Option<Instant>,
1290    conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1291    endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1292    pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1293    pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1294    pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1295    /// Always set to Some before the connection becomes drained
1296    pub(crate) error: Option<ConnectionError>,
1297    /// Tracks paths being opened
1298    open_path: FxHashMap<PathId, watch::Sender<Result<(), PathError>>>,
1299    /// Tracks paths being closed
1300    pub(crate) close_path: FxHashMap<PathId, oneshot::Sender<VarInt>>,
1301    pub(crate) path_events: tokio::sync::broadcast::Sender<PathEvent>,
1302    /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1303    ref_count: usize,
1304    sender: Pin<Box<dyn UdpSender>>,
1305    pub(crate) runtime: Arc<dyn Runtime>,
1306    send_buffer: Vec<u8>,
1307    /// We buffer a transmit when the underlying I/O would block
1308    buffered_transmit: Option<proto::Transmit>,
1309    /// Our last external address reported by the peer. When multipath is enabled, this will be the
1310    /// last report across all paths.
1311    pub(crate) observed_external_addr: watch::Sender<Option<SocketAddr>>,
1312    pub(crate) nat_traversal_updates: tokio::sync::broadcast::Sender<iroh_hp::Event>,
1313    on_closed: Vec<oneshot::Sender<(ConnectionError, ConnectionStats)>>,
1314}
1315
1316impl State {
1317    #[allow(clippy::too_many_arguments)]
1318    fn new(
1319        inner: proto::Connection,
1320        handle: ConnectionHandle,
1321        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1322        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1323        on_handshake_data: oneshot::Sender<()>,
1324        on_connected: oneshot::Sender<bool>,
1325        sender: Pin<Box<dyn UdpSender>>,
1326        runtime: Arc<dyn Runtime>,
1327    ) -> Self {
1328        Self {
1329            inner,
1330            driver: None,
1331            handle,
1332            on_handshake_data: Some(on_handshake_data),
1333            on_connected: Some(on_connected),
1334            connected: false,
1335            handshake_confirmed: false,
1336            timer: None,
1337            timer_deadline: None,
1338            conn_events,
1339            endpoint_events,
1340            blocked_writers: FxHashMap::default(),
1341            blocked_readers: FxHashMap::default(),
1342            stopped: FxHashMap::default(),
1343            open_path: FxHashMap::default(),
1344            close_path: FxHashMap::default(),
1345            error: None,
1346            ref_count: 0,
1347            sender,
1348            runtime,
1349            send_buffer: Vec::new(),
1350            buffered_transmit: None,
1351            path_events: tokio::sync::broadcast::channel(32).0,
1352            observed_external_addr: watch::Sender::new(None),
1353            nat_traversal_updates: tokio::sync::broadcast::channel(32).0,
1354            on_closed: Vec::new(),
1355        }
1356    }
1357
1358    fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1359        let now = self.runtime.now();
1360        let mut transmits = 0;
1361
1362        let max_datagrams = self
1363            .sender
1364            .max_transmit_segments()
1365            .min(MAX_TRANSMIT_SEGMENTS);
1366
1367        loop {
1368            // Retry the last transmit, or get a new one.
1369            let t = match self.buffered_transmit.take() {
1370                Some(t) => t,
1371                None => {
1372                    self.send_buffer.clear();
1373                    match self
1374                        .inner
1375                        .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1376                    {
1377                        Some(t) => {
1378                            transmits += match t.segment_size {
1379                                None => 1,
1380                                Some(s) => t.size.div_ceil(s), // round up
1381                            };
1382                            t
1383                        }
1384                        None => break,
1385                    }
1386                }
1387            };
1388
1389            let len = t.size;
1390            match self
1391                .sender
1392                .as_mut()
1393                .poll_send(&udp_transmit(&t, &self.send_buffer[..len]), cx)
1394            {
1395                Poll::Pending => {
1396                    self.buffered_transmit = Some(t);
1397                    return Ok(false);
1398                }
1399                Poll::Ready(Err(e)) => return Err(e),
1400                Poll::Ready(Ok(())) => {}
1401            }
1402
1403            if transmits >= MAX_TRANSMIT_DATAGRAMS {
1404                // TODO: What isn't ideal here yet is that if we don't poll all
1405                // datagrams that could be sent we don't go into the `app_limited`
1406                // state and CWND continues to grow until we get here the next time.
1407                // See https://github.com/quinn-rs/quinn/issues/1126
1408                return Ok(true);
1409            }
1410        }
1411
1412        Ok(false)
1413    }
1414
1415    fn forward_endpoint_events(&mut self) {
1416        while let Some(event) = self.inner.poll_endpoint_events() {
1417            // If the endpoint driver is gone, noop.
1418            let _ = self.endpoint_events.send((self.handle, event));
1419        }
1420    }
1421
1422    /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1423    fn process_conn_events(
1424        &mut self,
1425        shared: &Shared,
1426        cx: &mut Context,
1427    ) -> Result<(), ConnectionError> {
1428        loop {
1429            match self.conn_events.poll_recv(cx) {
1430                Poll::Ready(Some(ConnectionEvent::Rebind(sender))) => {
1431                    self.sender = sender;
1432                    self.inner.local_address_changed();
1433                }
1434                Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1435                    self.inner.handle_event(event);
1436                }
1437                Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1438                    self.close(error_code, reason, shared);
1439                }
1440                Poll::Ready(None) => {
1441                    return Err(ConnectionError::TransportError(TransportError::new(
1442                        TransportErrorCode::INTERNAL_ERROR,
1443                        "endpoint driver future was dropped".to_string(),
1444                    )));
1445                }
1446                Poll::Pending => {
1447                    return Ok(());
1448                }
1449            }
1450        }
1451    }
1452
1453    fn forward_app_events(&mut self, shared: &Shared) {
1454        while let Some(event) = self.inner.poll() {
1455            use proto::Event::*;
1456            match event {
1457                HandshakeDataReady => {
1458                    if let Some(x) = self.on_handshake_data.take() {
1459                        let _ = x.send(());
1460                    }
1461                }
1462                Connected => {
1463                    self.connected = true;
1464                    if let Some(x) = self.on_connected.take() {
1465                        // We don't care if the on-connected future was dropped
1466                        let _ = x.send(self.inner.accepted_0rtt());
1467                    }
1468                    if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1469                        // Wake up rejected 0-RTT streams so they can fail immediately with
1470                        // `ZeroRttRejected` errors.
1471                        wake_all(&mut self.blocked_writers);
1472                        wake_all(&mut self.blocked_readers);
1473                        wake_all_notify(&mut self.stopped);
1474                    }
1475                }
1476                HandshakeConfirmed => {
1477                    self.handshake_confirmed = true;
1478                    shared.handshake_confirmed.notify_waiters();
1479                }
1480                ConnectionLost { reason } => {
1481                    self.terminate(reason, shared);
1482                }
1483                Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1484                Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1485                    shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1486                }
1487                Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1488                    shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1489                }
1490                DatagramReceived => {
1491                    shared.datagram_received.notify_waiters();
1492                }
1493                DatagramsUnblocked => {
1494                    shared.datagrams_unblocked.notify_waiters();
1495                }
1496                Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1497                Stream(StreamEvent::Available { dir }) => {
1498                    // Might mean any number of streams are ready, so we wake up everyone
1499                    shared.stream_budget_available[dir as usize].notify_waiters();
1500                }
1501                Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1502                Stream(StreamEvent::Stopped { id, .. }) => {
1503                    wake_stream_notify(id, &mut self.stopped);
1504                    wake_stream(id, &mut self.blocked_writers);
1505                }
1506                Path(ref evt @ PathEvent::ObservedAddr { addr: observed, .. }) => {
1507                    self.path_events.send(evt.clone()).ok();
1508                    self.observed_external_addr.send_if_modified(|addr| {
1509                        let old = addr.replace(observed);
1510                        old != *addr
1511                    });
1512                }
1513                Path(ref evt @ PathEvent::Opened { id }) => {
1514                    self.path_events.send(evt.clone()).ok();
1515                    if let Some(sender) = self.open_path.remove(&id) {
1516                        sender.send_modify(|value| *value = Ok(()));
1517                    }
1518                }
1519                Path(ref evt @ PathEvent::Closed { id, error_code }) => {
1520                    self.path_events.send(evt.clone()).ok();
1521                    if let Some(sender) = self.close_path.remove(&id) {
1522                        let _ = sender.send(error_code);
1523                    }
1524                }
1525                Path(evt @ PathEvent::Abandoned { .. }) => {
1526                    self.path_events.send(evt).ok();
1527                }
1528                Path(ref evt @ PathEvent::LocallyClosed { id, error }) => {
1529                    self.path_events.send(evt.clone()).ok();
1530                    if let Some(sender) = self.open_path.remove(&id) {
1531                        sender.send_modify(|value| *value = Err(error));
1532                    }
1533                    // this will happen also for already opened paths
1534                }
1535                Path(evt @ PathEvent::RemoteStatus { .. }) => {
1536                    self.path_events.send(evt).ok();
1537                }
1538                NatTraversal(update) => {
1539                    self.nat_traversal_updates.send(update).ok();
1540                }
1541            }
1542        }
1543    }
1544
1545    fn drive_timer(&mut self, cx: &mut Context) -> bool {
1546        // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1547        // timer is registered with the runtime (and check whether it's already
1548        // expired).
1549        match self.inner.poll_timeout() {
1550            Some(deadline) => {
1551                if let Some(delay) = &mut self.timer {
1552                    // There is no need to reset the tokio timer if the deadline
1553                    // did not change
1554                    if self
1555                        .timer_deadline
1556                        .map(|current_deadline| current_deadline != deadline)
1557                        .unwrap_or(true)
1558                    {
1559                        delay.as_mut().reset(deadline);
1560                    }
1561                } else {
1562                    self.timer = Some(self.runtime.new_timer(deadline));
1563                }
1564                // Store the actual expiration time of the timer
1565                self.timer_deadline = Some(deadline);
1566            }
1567            None => {
1568                self.timer_deadline = None;
1569                return false;
1570            }
1571        }
1572
1573        if self.timer_deadline.is_none() {
1574            return false;
1575        }
1576
1577        let delay = self
1578            .timer
1579            .as_mut()
1580            .expect("timer must exist in this state")
1581            .as_mut();
1582        if delay.poll(cx).is_pending() {
1583            // Since there wasn't a timeout event, there is nothing new
1584            // for the connection to do
1585            return false;
1586        }
1587
1588        // A timer expired, so the caller needs to check for
1589        // new transmits, which might cause new timers to be set.
1590        self.inner.handle_timeout(self.runtime.now());
1591        self.timer_deadline = None;
1592        true
1593    }
1594
1595    /// Wake up a blocked `Driver` task to process I/O
1596    pub(crate) fn wake(&mut self) {
1597        if let Some(x) = self.driver.take() {
1598            x.wake();
1599        }
1600    }
1601
1602    /// Used to wake up all blocked futures when the connection becomes closed for any reason
1603    fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1604        self.error = Some(reason.clone());
1605        if let Some(x) = self.on_handshake_data.take() {
1606            let _ = x.send(());
1607        }
1608        wake_all(&mut self.blocked_writers);
1609        wake_all(&mut self.blocked_readers);
1610        shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1611        shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1612        shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1613        shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1614        shared.datagram_received.notify_waiters();
1615        shared.datagrams_unblocked.notify_waiters();
1616        if let Some(x) = self.on_connected.take() {
1617            let _ = x.send(false);
1618        }
1619        shared.handshake_confirmed.notify_waiters();
1620        wake_all_notify(&mut self.stopped);
1621        shared.closed.notify_waiters();
1622
1623        // Send to the registered on_closed futures.
1624        let stats = self.inner.stats();
1625        for tx in self.on_closed.drain(..) {
1626            tx.send((reason.clone(), stats.clone())).ok();
1627        }
1628    }
1629
1630    fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1631        self.inner.close(self.runtime.now(), error_code, reason);
1632        self.terminate(ConnectionError::LocallyClosed, shared);
1633        self.wake();
1634    }
1635
1636    /// Close for a reason other than the application's explicit request
1637    pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1638        self.close(0u32.into(), Bytes::new(), shared);
1639    }
1640
1641    pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1642        if self.inner.is_handshaking()
1643            || self.inner.accepted_0rtt()
1644            || self.inner.side().is_server()
1645        {
1646            Ok(())
1647        } else {
1648            Err(())
1649        }
1650    }
1651}
1652
1653impl Drop for State {
1654    fn drop(&mut self) {
1655        if !self.inner.is_drained() {
1656            // Ensure the endpoint can tidy up
1657            let _ = self
1658                .endpoint_events
1659                .send((self.handle, proto::EndpointEvent::drained()));
1660        }
1661
1662        if !self.on_closed.is_empty() {
1663            // Ensure that all on_closed oneshot senders are triggered before dropping.
1664            let reason = self.error.as_ref().expect("closed without error reason");
1665            let stats = self.inner.stats();
1666            for tx in self.on_closed.drain(..) {
1667                tx.send((reason.clone(), stats.clone())).ok();
1668            }
1669        }
1670    }
1671}
1672
1673impl fmt::Debug for State {
1674    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1675        f.debug_struct("State").field("inner", &self.inner).finish()
1676    }
1677}
1678
1679fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1680    if let Some(waker) = wakers.remove(&stream_id) {
1681        waker.wake();
1682    }
1683}
1684
1685fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1686    wakers.drain().for_each(|(_, waker)| waker.wake())
1687}
1688
1689fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1690    if let Some(notify) = wakers.remove(&stream_id) {
1691        notify.notify_waiters()
1692    }
1693}
1694
1695fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1696    wakers
1697        .drain()
1698        .for_each(|(_, notify)| notify.notify_waiters())
1699}
1700
1701/// Errors that can arise when sending a datagram
1702#[derive(Debug, Error, Clone, Eq, PartialEq)]
1703pub enum SendDatagramError {
1704    /// The peer does not support receiving datagram frames
1705    #[error("datagrams not supported by peer")]
1706    UnsupportedByPeer,
1707    /// Datagram support is disabled locally
1708    #[error("datagram support disabled")]
1709    Disabled,
1710    /// The datagram is larger than the connection can currently accommodate
1711    ///
1712    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1713    /// exceeded.
1714    #[error("datagram too large")]
1715    TooLarge,
1716    /// The connection was lost
1717    #[error("connection lost")]
1718    ConnectionLost(#[from] ConnectionError),
1719}
1720
1721/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1722///
1723/// This limits the amount of CPU resources consumed by datagram generation,
1724/// and allows other tasks (like receiving ACKs) to run in between.
1725const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1726
1727/// The maximum amount of datagrams that are sent in a single transmit
1728///
1729/// This can be lower than the maximum platform capabilities, to avoid excessive
1730/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1731/// that numbers around 10 are a good compromise.
1732const MAX_TRANSMIT_SEGMENTS: NonZeroUsize = NonZeroUsize::new(10).expect("known");