iroh_quinn/
connection.rs

1use std::{
2    any::Any,
3    fmt,
4    future::Future,
5    io,
6    net::{IpAddr, SocketAddr},
7    num::NonZeroUsize,
8    pin::Pin,
9    sync::{Arc, Weak},
10    task::{Context, Poll, Waker, ready},
11};
12
13use bytes::Bytes;
14use pin_project_lite::pin_project;
15use rustc_hash::FxHashMap;
16use thiserror::Error;
17use tokio::sync::{Notify, futures::Notified, mpsc, oneshot, watch};
18use tracing::{Instrument, Span, debug_span};
19
20use crate::{
21    ConnectionEvent, Duration, Instant, Path, VarInt,
22    endpoint::ensure_ipv6,
23    mutex::Mutex,
24    path::OpenPath,
25    recv_stream::RecvStream,
26    runtime::{AsyncTimer, Runtime, UdpSender},
27    send_stream::SendStream,
28    udp_transmit,
29};
30use proto::{
31    ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent, FourTuple, PathError,
32    PathEvent, PathId, PathStats, PathStatus, Side, StreamEvent, StreamId, TransportError,
33    TransportErrorCode, congestion::Controller, iroh_hp,
34};
35
36/// In-progress connection attempt future
37#[derive(Debug)]
38pub struct Connecting {
39    conn: Option<ConnectionRef>,
40    connected: oneshot::Receiver<bool>,
41    handshake_data_ready: Option<oneshot::Receiver<()>>,
42}
43
44impl Connecting {
45    pub(crate) fn new(
46        handle: ConnectionHandle,
47        conn: proto::Connection,
48        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
49        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
50        sender: Pin<Box<dyn UdpSender>>,
51        runtime: Arc<dyn Runtime>,
52    ) -> Self {
53        let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
54        let (on_connected_send, on_connected_recv) = oneshot::channel();
55
56        let conn = ConnectionRef(Arc::new(Arc::new(ConnectionInner {
57            state: Mutex::new(State::new(
58                conn,
59                handle,
60                endpoint_events,
61                conn_events,
62                on_handshake_data_send,
63                on_connected_send,
64                sender,
65                runtime.clone(),
66            )),
67            shared: Shared::default(),
68        })));
69
70        let driver = ConnectionDriver(conn.clone());
71        runtime.spawn(Box::pin(
72            async {
73                if let Err(e) = driver.await {
74                    tracing::error!("I/O error: {e}");
75                }
76            }
77            .instrument(Span::current()),
78        ));
79
80        Self {
81            conn: Some(conn),
82            connected: on_connected_recv,
83            handshake_data_ready: Some(on_handshake_data_recv),
84        }
85    }
86
87    /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
88    ///
89    /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
90    /// If so, the returned [`Connection`] can be used to send application data without waiting for
91    /// the rest of the handshake to complete, at the cost of weakened cryptographic security
92    /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
93    /// complete, at which point subsequently opened streams and written data will have full
94    /// cryptographic protection.
95    ///
96    /// ## Outgoing
97    ///
98    /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
99    /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
100    /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
101    /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
102    /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
103    /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
104    /// If it was rejected, the existence of streams opened and other application data sent prior
105    /// to the handshake completing will not be conveyed to the remote application, and local
106    /// operations on them will return `ZeroRttRejected` errors.
107    ///
108    /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
109    /// relevant resumption state to be stored in the server, which servers may limit or lose for
110    /// various reasons including not persisting resumption state across server restarts.
111    ///
112    /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
113    /// implementation's docs for 0-RTT pitfalls.
114    ///
115    /// ## Incoming
116    ///
117    /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
118    /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
119    ///
120    /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
121    /// implementation's docs for 0-RTT pitfalls.
122    ///
123    /// ## Security
124    ///
125    /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
126    /// replay attacks, and should therefore never invoke non-idempotent operations.
127    ///
128    /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
129    /// before TLS client authentication has occurred, and should therefore not be used to send
130    /// data for which client authentication is being used.
131    pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
132        // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
133        // have to release it explicitly before returning `self` by value.
134        let conn = (self.conn.as_mut().unwrap()).state.lock("into_0rtt");
135
136        let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
137        drop(conn);
138
139        if is_ok {
140            let conn = self.conn.take().unwrap();
141            Ok((Connection(conn), ZeroRttAccepted(self.connected)))
142        } else {
143            Err(self)
144        }
145    }
146
147    /// Parameters negotiated during the handshake
148    ///
149    /// The dynamic type returned is determined by the configured
150    /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
151    /// be [`downcast`](Box::downcast) to a
152    /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
153    pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
154        // Taking &mut self allows us to use a single oneshot channel rather than dealing with
155        // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
156        // simple.
157        if let Some(x) = self.handshake_data_ready.take() {
158            let _ = x.await;
159        }
160        let conn = self.conn.as_ref().unwrap();
161        let inner = conn.state.lock("handshake");
162        inner
163            .inner
164            .crypto_session()
165            .handshake_data()
166            .ok_or_else(|| {
167                inner
168                    .error
169                    .clone()
170                    .expect("spurious handshake data ready notification")
171            })
172    }
173
174    /// The local IP address which was used when the peer established
175    /// the connection
176    ///
177    /// This can be different from the address the endpoint is bound to, in case
178    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
179    ///
180    /// This will return `None` for clients, or when the platform does not expose this
181    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
182    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
183    ///
184    /// Will panic if called after `poll` has returned `Ready`.
185    pub fn local_ip(&self) -> Option<IpAddr> {
186        let conn = self.conn.as_ref().unwrap();
187        let inner = conn.state.lock("local_ip");
188
189        inner
190            .inner
191            .network_path(PathId::ZERO)
192            .expect("path exists when connecting")
193            .local_ip
194    }
195
196    /// The peer's UDP addresses
197    ///
198    /// Will panic if called after `poll` has returned `Ready`.
199    pub fn remote_address(&self) -> SocketAddr {
200        let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready");
201        // TODO: another unwrap
202        conn_ref
203            .state
204            .lock("remote_address")
205            .inner
206            .network_path(PathId::ZERO)
207            .expect("path exists when connecting")
208            .remote
209    }
210}
211
212impl Future for Connecting {
213    type Output = Result<Connection, ConnectionError>;
214    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
215        Pin::new(&mut self.connected).poll(cx).map(|_| {
216            let conn = self.conn.take().unwrap();
217            let inner = conn.state.lock("connecting");
218            if inner.connected {
219                drop(inner);
220                Ok(Connection(conn))
221            } else {
222                Err(inner
223                    .error
224                    .clone()
225                    .expect("connected signaled without connection success or error"))
226            }
227        })
228    }
229}
230
231/// Future that completes when a connection is fully established
232///
233/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
234/// value is meaningless.
235pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
236
237impl Future for ZeroRttAccepted {
238    type Output = bool;
239    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
240        Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
241    }
242}
243
244/// A future that drives protocol logic for a connection
245///
246/// This future handles the protocol logic for a single connection, routing events from the
247/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
248/// It also keeps track of outstanding timeouts for the `Connection`.
249///
250/// If the connection encounters an error condition, this future will yield an error. It will
251/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
252/// connection-related futures, this waits for the draining period to complete to ensure that
253/// packets still in flight from the peer are handled gracefully.
254#[must_use = "connection drivers must be spawned for their connections to function"]
255#[derive(Debug)]
256struct ConnectionDriver(ConnectionRef);
257
258impl Future for ConnectionDriver {
259    type Output = Result<(), io::Error>;
260
261    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
262        let conn = &mut *self.0.state.lock("poll");
263
264        let span = debug_span!("drive", id = conn.handle.0);
265        let _guard = span.enter();
266
267        if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
268            conn.terminate(e, &self.0.shared);
269            return Poll::Ready(Ok(()));
270        }
271        let mut keep_going = conn.drive_transmit(cx)?;
272        // If a timer expires, there might be more to transmit. When we transmit something, we
273        // might need to reset a timer. Hence, we must loop until neither happens.
274        keep_going |= conn.drive_timer(cx);
275        conn.forward_endpoint_events();
276        conn.forward_app_events(&self.0.shared);
277
278        if !conn.inner.is_drained() {
279            if keep_going {
280                // If the connection hasn't processed all tasks, schedule it again
281                cx.waker().wake_by_ref();
282            } else {
283                conn.driver = Some(cx.waker().clone());
284            }
285            return Poll::Pending;
286        }
287        if conn.error.is_none() {
288            unreachable!("drained connections always have an error");
289        }
290        Poll::Ready(Ok(()))
291    }
292}
293
294/// A QUIC connection.
295///
296/// If all references to a connection (including every clone of the `Connection` handle, streams of
297/// incoming streams, and the various stream types) have been dropped, then the connection will be
298/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
299/// connection explicitly by calling [`Connection::close()`].
300///
301/// Closing the connection immediately abandons efforts to deliver data to the peer.  Upon
302/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
303/// application. [`Connection::close()`] describes in more detail how to gracefully close a
304/// connection without losing application data.
305///
306/// May be cloned to obtain another handle to the same connection.
307///
308/// [`Connection::close()`]: Connection::close
309#[derive(Debug, Clone)]
310pub struct Connection(ConnectionRef);
311
312impl Connection {
313    /// Returns a weak reference to the inner connection struct.
314    pub fn weak_handle(&self) -> WeakConnectionHandle {
315        WeakConnectionHandle(Arc::downgrade(&self.0.0))
316    }
317
318    /// Initiate a new outgoing unidirectional stream.
319    ///
320    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
321    /// consequence, the peer won't be notified that a stream has been opened until the stream is
322    /// actually used.
323    pub fn open_uni(&self) -> OpenUni<'_> {
324        OpenUni {
325            conn: &self.0,
326            notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
327        }
328    }
329
330    /// Initiate a new outgoing bidirectional stream.
331    ///
332    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
333    /// consequence, the peer won't be notified that a stream has been opened until the stream is
334    /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
335    /// anything to [`SendStream`] will never succeed.
336    ///
337    /// [`open_bi()`]: crate::Connection::open_bi
338    /// [`SendStream`]: crate::SendStream
339    /// [`RecvStream`]: crate::RecvStream
340    pub fn open_bi(&self) -> OpenBi<'_> {
341        OpenBi {
342            conn: &self.0,
343            notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
344        }
345    }
346
347    /// Accept the next incoming uni-directional stream
348    pub fn accept_uni(&self) -> AcceptUni<'_> {
349        AcceptUni {
350            conn: &self.0,
351            notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
352        }
353    }
354
355    /// Accept the next incoming bidirectional stream
356    ///
357    /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
358    /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
359    /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
360    ///
361    /// [`accept_bi()`]: crate::Connection::accept_bi
362    /// [`open_bi()`]: crate::Connection::open_bi
363    /// [`SendStream`]: crate::SendStream
364    /// [`RecvStream`]: crate::RecvStream
365    pub fn accept_bi(&self) -> AcceptBi<'_> {
366        AcceptBi {
367            conn: &self.0,
368            notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
369        }
370    }
371
372    /// Receive an application datagram
373    pub fn read_datagram(&self) -> ReadDatagram<'_> {
374        ReadDatagram {
375            conn: &self.0,
376            notify: self.0.shared.datagram_received.notified(),
377        }
378    }
379
380    /// Opens a new path if no path exists yet for the remote address.
381    ///
382    /// Otherwise behaves exactly as [`open_path`].
383    ///
384    /// [`open_path`]: Self::open_path
385    pub fn open_path_ensure(&self, addr: SocketAddr, initial_status: PathStatus) -> OpenPath {
386        let mut state = self.0.state.lock("open_path");
387
388        // If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6.
389        // If not, we do not support IPv6.  We can not access endpoint::State from here
390        // however, but either all our paths use an IPv6 address, or all our paths use an
391        // IPv4 address.  So we can use that information.
392        let ipv6 = state
393            .inner
394            .paths()
395            .iter()
396            .filter_map(|id| {
397                state
398                    .inner
399                    .network_path(*id)
400                    .map(|addrs| addrs.remote.is_ipv6())
401                    .ok()
402            })
403            .next()
404            .unwrap_or_default();
405        if addr.is_ipv6() && !ipv6 {
406            return OpenPath::rejected(PathError::InvalidRemoteAddress(addr));
407        }
408        let addr = if ipv6 {
409            SocketAddr::V6(ensure_ipv6(addr))
410        } else {
411            addr
412        };
413
414        let now = state.runtime.now();
415        // TODO(matheus23): For now this means it's impossible to make use of short-circuiting path validation currently.
416        // However, changing that would mean changing the API.
417        let addrs = FourTuple {
418            remote: addr,
419            local_ip: None,
420        };
421        let open_res = state.inner.open_path_ensure(addrs, initial_status, now);
422        state.wake();
423        match open_res {
424            Ok((path_id, existed)) if existed => {
425                match state.open_path.get(&path_id).map(|tx| tx.subscribe()) {
426                    Some(recv) => OpenPath::new(path_id, recv, self.0.clone()),
427                    None => OpenPath::ready(path_id, self.0.clone()),
428                }
429            }
430            Ok((path_id, _)) => {
431                let (tx, rx) = watch::channel(Ok(()));
432                state.open_path.insert(path_id, tx);
433                drop(state);
434                OpenPath::new(path_id, rx, self.0.clone())
435            }
436            Err(err) => OpenPath::rejected(err),
437        }
438    }
439
440    /// Opens an additional path if the multipath extension is negotiated.
441    ///
442    /// The returned future completes once the path is either fully opened and ready to
443    /// carry application data, or if there was an error.
444    ///
445    /// Dropping the returned future does not cancel the opening of the path, the
446    /// [`PathEvent::Opened`] event will still be emitted from [`Self::path_events`] if the
447    /// path opens.  The [`PathId`] for the events can be extracted from
448    /// [`OpenPath::path_id`].
449    ///
450    /// Failure to open a path can either occur immediately, before polling the returned
451    /// future, or at a later time.  If the failure is immediate [`OpenPath::path_id`] will
452    /// return `None` and the future will be ready immediately.  If the failure happens
453    /// later, a [`PathEvent`] will be emitted.
454    pub fn open_path(&self, addr: SocketAddr, initial_status: PathStatus) -> OpenPath {
455        let mut state = self.0.state.lock("open_path");
456
457        // If endpoint::State::ipv6 is true we want to keep all our IP addresses as IPv6.
458        // If not, we do not support IPv6.  We can not access endpoint::State from here
459        // however, but either all our paths use an IPv6 address, or all our paths use an
460        // IPv4 address.  So we can use that information.
461        let ipv6 = state
462            .inner
463            .paths()
464            .iter()
465            .filter_map(|id| {
466                state
467                    .inner
468                    .network_path(*id)
469                    .map(|addrs| addrs.remote.is_ipv6())
470                    .ok()
471            })
472            .next()
473            .unwrap_or_default();
474        if addr.is_ipv6() && !ipv6 {
475            return OpenPath::rejected(PathError::InvalidRemoteAddress(addr));
476        }
477        let addr = if ipv6 {
478            SocketAddr::V6(ensure_ipv6(addr))
479        } else {
480            addr
481        };
482
483        let (on_open_path_send, on_open_path_recv) = watch::channel(Ok(()));
484        let now = state.runtime.now();
485        // TODO(matheus23): For now this means it's impossible to make use of short-circuiting path validation currently.
486        // However, changing that would mean changing the API.
487        let addrs = FourTuple {
488            remote: addr,
489            local_ip: None,
490        };
491        let open_res = state.inner.open_path(addrs, initial_status, now);
492        state.wake();
493        match open_res {
494            Ok(path_id) => {
495                state.open_path.insert(path_id, on_open_path_send);
496                drop(state);
497                OpenPath::new(path_id, on_open_path_recv, self.0.clone())
498            }
499            Err(err) => OpenPath::rejected(err),
500        }
501    }
502
503    /// Returns the [`Path`] structure of an open path
504    pub fn path(&self, id: PathId) -> Option<Path> {
505        // TODO(flub): Using this to know if the path still exists is... hacky.
506        self.0.state.lock("path").inner.path_status(id).ok()?;
507        Some(Path {
508            id,
509            conn: self.0.clone(),
510        })
511    }
512
513    /// A broadcast receiver of [`PathEvent`]s for all paths in this connection
514    pub fn path_events(&self) -> tokio::sync::broadcast::Receiver<PathEvent> {
515        self.0.state.lock("path_events").path_events.subscribe()
516    }
517
518    /// A broadcast receiver of [`iroh_hp::Event`]s for updates about server addresses
519    pub fn nat_traversal_updates(&self) -> tokio::sync::broadcast::Receiver<iroh_hp::Event> {
520        self.0
521            .state
522            .lock("nat_traversal_updates")
523            .nat_traversal_updates
524            .subscribe()
525    }
526
527    /// Wait for the connection to be closed for any reason
528    ///
529    /// Despite the return type's name, closed connections are often not an error condition at the
530    /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
531    /// and [`ConnectionError::ApplicationClosed`].
532    pub async fn closed(&self) -> ConnectionError {
533        {
534            let conn = self.0.state.lock("closed");
535            if let Some(error) = conn.error.as_ref() {
536                return error.clone();
537            }
538            // Construct the future while the lock is held to ensure we can't miss a wakeup if
539            // the `Notify` is signaled immediately after we release the lock. `await` it after
540            // the lock guard is out of scope.
541            self.0.shared.closed.notified()
542        }
543        .await;
544        self.0
545            .state
546            .lock("closed")
547            .error
548            .as_ref()
549            .expect("closed without an error")
550            .clone()
551    }
552
553    /// Wait for the connection to be closed without keeping a strong reference to the connection
554    ///
555    /// Returns a future that resolves, once the connection is closed, to a tuple of
556    /// ([`ConnectionError`], [`ConnectionStats`]).
557    ///
558    /// Calling [`Self::closed`] keeps the connection alive until it is either closed locally via [`Connection::close`]
559    /// or closed by the remote peer. This function instead does not keep the connection itself alive,
560    /// so if all *other* clones of the connection are dropped, the connection will be closed implicitly even
561    /// if there are futures returned from this function still being awaited.
562    pub fn on_closed(&self) -> OnClosed {
563        let (tx, rx) = oneshot::channel();
564        self.0.state.lock("on_closed").on_closed.push(tx);
565        OnClosed {
566            conn: self.weak_handle(),
567            rx,
568        }
569    }
570
571    /// If the connection is closed, the reason why.
572    ///
573    /// Returns `None` if the connection is still open.
574    pub fn close_reason(&self) -> Option<ConnectionError> {
575        self.0.state.lock("close_reason").error.clone()
576    }
577
578    /// Close the connection immediately.
579    ///
580    /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
581    /// more data is sent to the peer and the peer may drop buffered data upon receiving
582    /// the CONNECTION_CLOSE frame.
583    ///
584    /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
585    ///
586    /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
587    /// is preserved in full, it should be kept under 1KiB.
588    ///
589    /// # Gracefully closing a connection
590    ///
591    /// Only the peer last receiving application data can be certain that all data is
592    /// delivered. The only reliable action it can then take is to close the connection,
593    /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
594    /// frame is very likely if both endpoints stay online long enough, and
595    /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
596    /// remote peer will time out the connection, provided that the idle timeout is not
597    /// disabled.
598    ///
599    /// The sending side can not guarantee all stream data is delivered to the remote
600    /// application. It only knows the data is delivered to the QUIC stack of the remote
601    /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
602    /// [`close()`] the remote endpoint may drop any data it received but is as yet
603    /// undelivered to the application, including data that was acknowledged as received to
604    /// the local endpoint.
605    ///
606    /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
607    /// [`Endpoint::wait_idle()`]: crate::Endpoint::wait_idle
608    /// [`close()`]: Connection::close
609    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
610        let conn = &mut *self.0.state.lock("close");
611        conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
612    }
613
614    /// Wait for the handshake to be confirmed.
615    ///
616    /// As a server, who must be authenticated by clients,
617    /// this happens when the handshake completes
618    /// upon receiving a TLS Finished message from the client.
619    /// In return, the server send a HANDSHAKE_DONE frame.
620    ///
621    /// As a client, this happens when receiving a HANDSHAKE_DONE frame.
622    /// At this point, the server has either accepted our authentication,
623    /// or, if client authentication is not required, accepted our lack of authentication.
624    pub async fn handshake_confirmed(&self) -> Result<(), ConnectionError> {
625        {
626            let conn = self.0.state.lock("handshake_confirmed");
627            if let Some(error) = conn.error.as_ref() {
628                return Err(error.clone());
629            }
630            if conn.handshake_confirmed {
631                return Ok(());
632            }
633            // Construct the future while the lock is held to ensure we can't miss a wakeup if
634            // the `Notify` is signaled immediately after we release the lock. `await` it after
635            // the lock guard is out of scope.
636            self.0.shared.handshake_confirmed.notified()
637        }
638        .await;
639        if let Some(error) = self.0.state.lock("handshake_confirmed").error.as_ref() {
640            Err(error.clone())
641        } else {
642            Ok(())
643        }
644    }
645
646    /// Transmit `data` as an unreliable, unordered application datagram
647    ///
648    /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
649    /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
650    /// dictated by the peer.
651    ///
652    /// Previously queued datagrams which are still unsent may be discarded to make space for this
653    /// datagram, in order of oldest to newest.
654    pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
655        let conn = &mut *self.0.state.lock("send_datagram");
656        if let Some(ref x) = conn.error {
657            return Err(SendDatagramError::ConnectionLost(x.clone()));
658        }
659        use proto::SendDatagramError::*;
660        match conn.inner.datagrams().send(data, true) {
661            Ok(()) => {
662                conn.wake();
663                Ok(())
664            }
665            Err(e) => Err(match e {
666                Blocked(..) => unreachable!(),
667                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
668                Disabled => SendDatagramError::Disabled,
669                TooLarge => SendDatagramError::TooLarge,
670            }),
671        }
672    }
673
674    /// Transmit `data` as an unreliable, unordered application datagram
675    ///
676    /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
677    /// conditions, which effectively prioritizes old datagrams over new datagrams.
678    ///
679    /// See [`send_datagram()`] for details.
680    ///
681    /// [`send_datagram()`]: Connection::send_datagram
682    pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
683        SendDatagram {
684            conn: &self.0,
685            data: Some(data),
686            notify: self.0.shared.datagrams_unblocked.notified(),
687        }
688    }
689
690    /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
691    ///
692    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
693    ///
694    /// This may change over the lifetime of a connection according to variation in the path MTU
695    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
696    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
697    ///
698    /// Not necessarily the maximum size of received datagrams.
699    ///
700    /// [`send_datagram()`]: Connection::send_datagram
701    pub fn max_datagram_size(&self) -> Option<usize> {
702        self.0
703            .state
704            .lock("max_datagram_size")
705            .inner
706            .datagrams()
707            .max_size()
708    }
709
710    /// Bytes available in the outgoing datagram buffer
711    ///
712    /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
713    /// at most this size is guaranteed not to cause older datagrams to be dropped.
714    pub fn datagram_send_buffer_space(&self) -> usize {
715        self.0
716            .state
717            .lock("datagram_send_buffer_space")
718            .inner
719            .datagrams()
720            .send_buffer_space()
721    }
722
723    /// The side of the connection (client or server)
724    pub fn side(&self) -> Side {
725        self.0.state.lock("side").inner.side()
726    }
727
728    /// The peer's UDP address
729    ///
730    /// If [`ServerConfig::migration`] is `true`, clients may change addresses at will,
731    /// e.g. when switching to a cellular internet connection.
732    ///
733    /// If [`multipath`] is enabled this will return the address of *any*
734    /// path, and may not be consistent. Prefer [`Path::remote_address`] instead.
735    ///
736    /// [`ServerConfig::migration`]: crate::ServerConfig::migration
737    /// [`multipath`]: crate::TransportConfig::max_concurrent_multipath_paths
738    pub fn remote_address(&self) -> SocketAddr {
739        // TODO: an unwrap again
740        let state = self.0.state.lock("remote_address");
741        state
742            .inner
743            .paths()
744            .iter()
745            .filter_map(|id| state.inner.network_path(*id).ok())
746            .next()
747            .unwrap()
748            .remote
749    }
750
751    /// The local IP address which was used when the peer established
752    /// the connection
753    ///
754    /// This can be different from the address the endpoint is bound to, in case
755    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
756    ///
757    /// This will return `None` for clients, or when the platform does not expose this
758    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
759    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
760    pub fn local_ip(&self) -> Option<IpAddr> {
761        // TODO: an unwrap again
762        let state = self.0.state.lock("remote_address");
763        state
764            .inner
765            .paths()
766            .iter()
767            .filter_map(|id| state.inner.network_path(*id).ok())
768            .next()
769            .unwrap()
770            .local_ip
771    }
772
773    /// Current best estimate of this connection's latency (round-trip-time)
774    pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
775        self.0.state.lock("rtt").inner.rtt(path_id)
776    }
777
778    /// Returns connection statistics
779    pub fn stats(&self) -> ConnectionStats {
780        self.0.state.lock("stats").inner.stats()
781    }
782
783    /// Returns path statistics
784    pub fn path_stats(&self, path_id: PathId) -> Option<PathStats> {
785        self.0.state.lock("path_stats").inner.path_stats(path_id)
786    }
787
788    /// Current state of the congestion control algorithm, for debugging purposes
789    pub fn congestion_state(&self, path_id: PathId) -> Option<Box<dyn Controller>> {
790        self.0
791            .state
792            .lock("congestion_state")
793            .inner
794            .congestion_state(path_id)
795            .map(|c| c.clone_box())
796    }
797
798    /// Parameters negotiated during the handshake
799    ///
800    /// Guaranteed to return `Some` on fully established connections or after
801    /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
802    /// the returned value.
803    ///
804    /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
805    pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
806        self.0
807            .state
808            .lock("handshake_data")
809            .inner
810            .crypto_session()
811            .handshake_data()
812    }
813
814    /// Cryptographic identity of the peer
815    ///
816    /// The dynamic type returned is determined by the configured
817    /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
818    /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
819    pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
820        self.0
821            .state
822            .lock("peer_identity")
823            .inner
824            .crypto_session()
825            .peer_identity()
826    }
827
828    /// A stable identifier for this connection
829    ///
830    /// Peer addresses and connection IDs can change, but this value will remain
831    /// fixed for the lifetime of the connection.
832    pub fn stable_id(&self) -> usize {
833        self.0.stable_id()
834    }
835
836    /// Update traffic keys spontaneously
837    ///
838    /// This primarily exists for testing purposes.
839    pub fn force_key_update(&self) {
840        self.0
841            .state
842            .lock("force_key_update")
843            .inner
844            .force_key_update()
845    }
846
847    /// Derive keying material from this connection's TLS session secrets.
848    ///
849    /// When both peers call this method with the same `label` and `context`
850    /// arguments and `output` buffers of equal length, they will get the
851    /// same sequence of bytes in `output`. These bytes are cryptographically
852    /// strong and pseudorandom, and are suitable for use as keying material.
853    ///
854    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
855    pub fn export_keying_material(
856        &self,
857        output: &mut [u8],
858        label: &[u8],
859        context: &[u8],
860    ) -> Result<(), proto::crypto::ExportKeyingMaterialError> {
861        self.0
862            .state
863            .lock("export_keying_material")
864            .inner
865            .crypto_session()
866            .export_keying_material(output, label, context)
867    }
868
869    /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
870    ///
871    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
872    /// `count`s increase both minimum and worst-case memory consumption.
873    pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
874        let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
875        conn.inner.set_max_concurrent_streams(Dir::Uni, count);
876        // May need to send MAX_STREAMS to make progress
877        conn.wake();
878    }
879
880    /// See [`proto::TransportConfig::send_window()`]
881    pub fn set_send_window(&self, send_window: u64) {
882        let mut conn = self.0.state.lock("set_send_window");
883        conn.inner.set_send_window(send_window);
884        conn.wake();
885    }
886
887    /// See [`proto::TransportConfig::receive_window()`]
888    pub fn set_receive_window(&self, receive_window: VarInt) {
889        let mut conn = self.0.state.lock("set_receive_window");
890        conn.inner.set_receive_window(receive_window);
891        conn.wake();
892    }
893
894    /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
895    ///
896    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
897    /// `count`s increase both minimum and worst-case memory consumption.
898    pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
899        let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
900        conn.inner.set_max_concurrent_streams(Dir::Bi, count);
901        // May need to send MAX_STREAMS to make progress
902        conn.wake();
903    }
904
905    /// Track changed on our external address as reported by the peer.
906    pub fn observed_external_addr(&self) -> watch::Receiver<Option<SocketAddr>> {
907        let conn = self.0.state.lock("external_addr");
908        conn.observed_external_addr.subscribe()
909    }
910
911    /// Is multipath enabled?
912    // TODO(flub): not a useful API, once we do real things with multipath we can remove
913    // this again.
914    pub fn is_multipath_enabled(&self) -> bool {
915        let conn = self.0.state.lock("is_multipath_enabled");
916        conn.inner.is_multipath_negotiated()
917    }
918
919    /// Registers one address at which this endpoint might be reachable
920    ///
921    /// When the NAT traversal extension is negotiated, servers send these addresses to clients in
922    /// `ADD_ADDRESS` frames. This allows clients to obtain server address candidates to initiate
923    /// NAT traversal attempts. Clients provide their own reachable addresses in `REACH_OUT` frames
924    /// when [`Self::initiate_nat_traversal_round`] is called.
925    pub fn add_nat_traversal_address(&self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
926        let mut conn = self.0.state.lock("add_nat_traversal_addresses");
927        conn.inner.add_nat_traversal_address(address)
928    }
929
930    /// Removes one or more addresses from the set of addresses at which this endpoint is reachable
931    ///
932    /// When the NAT traversal extension is negotiated, servers send address removals to
933    /// clients in `REMOVE_ADDRESS` frames. This allows clients to stop using outdated
934    /// server address candidates that are no longer valid for NAT traversal.
935    ///
936    /// For clients, removed addresses will no longer be advertised in `REACH_OUT` frames.
937    ///
938    /// Addresses not present in the set will be silently ignored.
939    pub fn remove_nat_traversal_address(&self, address: SocketAddr) -> Result<(), iroh_hp::Error> {
940        let mut conn = self.0.state.lock("remove_nat_traversal_addresses");
941        conn.inner.remove_nat_traversal_address(address)
942    }
943
944    /// Get the current local nat traversal addresses
945    pub fn get_local_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
946        let conn = self.0.state.lock("get_local_nat_traversal_addresses");
947        conn.inner.get_local_nat_traversal_addresses()
948    }
949
950    /// Get the currently advertised nat traversal addresses by the server
951    pub fn get_remote_nat_traversal_addresses(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
952        let conn = self.0.state.lock("get_remote_nat_traversal_addresses");
953        conn.inner.get_remote_nat_traversal_addresses()
954    }
955
956    /// Initiates a new nat traversal round
957    ///
958    /// A nat traversal round involves advertising the client's local addresses in `REACH_OUT`
959    /// frames, and initiating probing of the known remote addresses. When a new round is
960    /// initiated, the previous one is cancelled, and paths that have not been opened are closed.
961    ///
962    /// Returns the server addresses that are now being probed.
963    pub fn initiate_nat_traversal_round(&self) -> Result<Vec<SocketAddr>, iroh_hp::Error> {
964        let mut conn = self.0.state.lock("initiate_nat_traversal_round");
965        let now = conn.runtime.now();
966        conn.inner.initiate_nat_traversal_round(now)
967    }
968}
969
970pin_project! {
971    /// Future produced by [`Connection::open_uni`]
972    pub struct OpenUni<'a> {
973        conn: &'a ConnectionRef,
974        #[pin]
975        notify: Notified<'a>,
976    }
977}
978
979impl Future for OpenUni<'_> {
980    type Output = Result<SendStream, ConnectionError>;
981    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
982        let this = self.project();
983        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
984        Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
985    }
986}
987
988pin_project! {
989    /// Future produced by [`Connection::open_bi`]
990    pub struct OpenBi<'a> {
991        conn: &'a ConnectionRef,
992        #[pin]
993        notify: Notified<'a>,
994    }
995}
996
997impl Future for OpenBi<'_> {
998    type Output = Result<(SendStream, RecvStream), ConnectionError>;
999    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1000        let this = self.project();
1001        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
1002
1003        Poll::Ready(Ok((
1004            SendStream::new(conn.clone(), id, is_0rtt),
1005            RecvStream::new(conn, id, is_0rtt),
1006        )))
1007    }
1008}
1009
1010fn poll_open<'a>(
1011    ctx: &mut Context<'_>,
1012    conn: &'a ConnectionRef,
1013    mut notify: Pin<&mut Notified<'a>>,
1014    dir: Dir,
1015) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1016    let mut state = conn.state.lock("poll_open");
1017    if let Some(ref e) = state.error {
1018        return Poll::Ready(Err(e.clone()));
1019    } else if let Some(id) = state.inner.streams().open(dir) {
1020        let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
1021        drop(state); // Release the lock so clone can take it
1022        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1023    }
1024    loop {
1025        match notify.as_mut().poll(ctx) {
1026            // `state` lock ensures we didn't race with readiness
1027            Poll::Pending => return Poll::Pending,
1028            // Spurious wakeup, get a new future
1029            Poll::Ready(()) => {
1030                notify.set(conn.shared.stream_budget_available[dir as usize].notified())
1031            }
1032        }
1033    }
1034}
1035
1036pin_project! {
1037    /// Future produced by [`Connection::accept_uni`]
1038    pub struct AcceptUni<'a> {
1039        conn: &'a ConnectionRef,
1040        #[pin]
1041        notify: Notified<'a>,
1042    }
1043}
1044
1045impl Future for AcceptUni<'_> {
1046    type Output = Result<RecvStream, ConnectionError>;
1047
1048    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1049        let this = self.project();
1050        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
1051        Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
1052    }
1053}
1054
1055pin_project! {
1056    /// Future produced by [`Connection::accept_bi`]
1057    pub struct AcceptBi<'a> {
1058        conn: &'a ConnectionRef,
1059        #[pin]
1060        notify: Notified<'a>,
1061    }
1062}
1063
1064impl Future for AcceptBi<'_> {
1065    type Output = Result<(SendStream, RecvStream), ConnectionError>;
1066
1067    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1068        let this = self.project();
1069        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
1070        Poll::Ready(Ok((
1071            SendStream::new(conn.clone(), id, is_0rtt),
1072            RecvStream::new(conn, id, is_0rtt),
1073        )))
1074    }
1075}
1076
1077fn poll_accept<'a>(
1078    ctx: &mut Context<'_>,
1079    conn: &'a ConnectionRef,
1080    mut notify: Pin<&mut Notified<'a>>,
1081    dir: Dir,
1082) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
1083    let mut state = conn.state.lock("poll_accept");
1084    // Check for incoming streams before checking `state.error` so that already-received streams,
1085    // which are necessarily finite, can be drained from a closed connection.
1086    if let Some(id) = state.inner.streams().accept(dir) {
1087        let is_0rtt = state.inner.is_handshaking();
1088        state.wake(); // To send additional stream ID credit
1089        drop(state); // Release the lock so clone can take it
1090        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
1091    } else if let Some(ref e) = state.error {
1092        return Poll::Ready(Err(e.clone()));
1093    }
1094    loop {
1095        match notify.as_mut().poll(ctx) {
1096            // `state` lock ensures we didn't race with readiness
1097            Poll::Pending => return Poll::Pending,
1098            // Spurious wakeup, get a new future
1099            Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
1100        }
1101    }
1102}
1103
1104pin_project! {
1105    /// Future produced by [`Connection::read_datagram`]
1106    pub struct ReadDatagram<'a> {
1107        conn: &'a ConnectionRef,
1108        #[pin]
1109        notify: Notified<'a>,
1110    }
1111}
1112
1113impl Future for ReadDatagram<'_> {
1114    type Output = Result<Bytes, ConnectionError>;
1115    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1116        let mut this = self.project();
1117        let mut state = this.conn.state.lock("ReadDatagram::poll");
1118        // Check for buffered datagrams before checking `state.error` so that already-received
1119        // datagrams, which are necessarily finite, can be drained from a closed connection.
1120        if let Some(x) = state.inner.datagrams().recv() {
1121            return Poll::Ready(Ok(x));
1122        } else if let Some(ref e) = state.error {
1123            return Poll::Ready(Err(e.clone()));
1124        }
1125        loop {
1126            match this.notify.as_mut().poll(ctx) {
1127                // `state` lock ensures we didn't race with readiness
1128                Poll::Pending => return Poll::Pending,
1129                // Spurious wakeup, get a new future
1130                Poll::Ready(()) => this
1131                    .notify
1132                    .set(this.conn.shared.datagram_received.notified()),
1133            }
1134        }
1135    }
1136}
1137
1138pin_project! {
1139    /// Future produced by [`Connection::send_datagram_wait`]
1140    pub struct SendDatagram<'a> {
1141        conn: &'a ConnectionRef,
1142        data: Option<Bytes>,
1143        #[pin]
1144        notify: Notified<'a>,
1145    }
1146}
1147
1148impl Future for SendDatagram<'_> {
1149    type Output = Result<(), SendDatagramError>;
1150    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
1151        let mut this = self.project();
1152        let mut state = this.conn.state.lock("SendDatagram::poll");
1153        if let Some(ref e) = state.error {
1154            return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
1155        }
1156        use proto::SendDatagramError::*;
1157        match state
1158            .inner
1159            .datagrams()
1160            .send(this.data.take().unwrap(), false)
1161        {
1162            Ok(()) => {
1163                state.wake();
1164                Poll::Ready(Ok(()))
1165            }
1166            Err(e) => Poll::Ready(Err(match e {
1167                Blocked(data) => {
1168                    this.data.replace(data);
1169                    loop {
1170                        match this.notify.as_mut().poll(ctx) {
1171                            Poll::Pending => return Poll::Pending,
1172                            // Spurious wakeup, get a new future
1173                            Poll::Ready(()) => this
1174                                .notify
1175                                .set(this.conn.shared.datagrams_unblocked.notified()),
1176                        }
1177                    }
1178                }
1179                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
1180                Disabled => SendDatagramError::Disabled,
1181                TooLarge => SendDatagramError::TooLarge,
1182            })),
1183        }
1184    }
1185}
1186
1187/// Future returned by [`Connection::on_closed`]
1188///
1189/// Resolves to a tuple of ([`ConnectionError`], [`ConnectionStats`]).
1190pub struct OnClosed {
1191    rx: oneshot::Receiver<(ConnectionError, ConnectionStats)>,
1192    conn: WeakConnectionHandle,
1193}
1194
1195impl Drop for OnClosed {
1196    fn drop(&mut self) {
1197        if self.rx.is_terminated() {
1198            return;
1199        };
1200        if let Some(conn) = self.conn.upgrade() {
1201            self.rx.close();
1202            conn.0
1203                .state
1204                .lock("OnClosed::drop")
1205                .on_closed
1206                .retain(|tx| !tx.is_closed());
1207        }
1208    }
1209}
1210
1211impl Future for OnClosed {
1212    type Output = (ConnectionError, ConnectionStats);
1213
1214    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1215        let this = self.get_mut();
1216        // The `expect` is safe because `State::drop` ensures that all senders are triggered
1217        // before being dropped.
1218        Pin::new(&mut this.rx)
1219            .poll(cx)
1220            .map(|x| x.expect("on_close sender is never dropped before sending"))
1221    }
1222}
1223
1224#[derive(Debug)]
1225#[allow(clippy::redundant_allocation)]
1226pub(crate) struct ConnectionRef(Arc<Arc<ConnectionInner>>);
1227
1228impl ConnectionRef {
1229    #[allow(clippy::redundant_allocation)]
1230    fn from_arc(inner: Arc<Arc<ConnectionInner>>) -> Self {
1231        inner.state.lock("from_arc").ref_count += 1;
1232        Self(inner)
1233    }
1234
1235    fn stable_id(&self) -> usize {
1236        &*self.0 as *const _ as usize
1237    }
1238}
1239
1240impl Clone for ConnectionRef {
1241    fn clone(&self) -> Self {
1242        Self::from_arc(Arc::clone(&self.0))
1243    }
1244}
1245
1246impl Drop for ConnectionRef {
1247    fn drop(&mut self) {
1248        let conn = &mut *self.state.lock("drop");
1249        if let Some(x) = conn.ref_count.checked_sub(1) {
1250            conn.ref_count = x;
1251            if x == 0 && !conn.inner.is_closed() {
1252                // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
1253                // not, we can't do any harm. If there were any streams being opened, then either
1254                // the connection will be closed for an unrelated reason or a fresh reference will
1255                // be constructed for the newly opened stream.
1256                conn.implicit_close(&self.shared);
1257            }
1258        }
1259    }
1260}
1261
1262impl std::ops::Deref for ConnectionRef {
1263    type Target = ConnectionInner;
1264    fn deref(&self) -> &Self::Target {
1265        &self.0
1266    }
1267}
1268
1269#[derive(Debug)]
1270pub(crate) struct ConnectionInner {
1271    pub(crate) state: Mutex<State>,
1272    pub(crate) shared: Shared,
1273}
1274
1275/// A handle to some connection internals, use with care.
1276///
1277/// This contains a weak reference to the connection so will not itself keep the connection
1278/// alive.
1279#[derive(Debug, Clone)]
1280pub struct WeakConnectionHandle(Weak<Arc<ConnectionInner>>);
1281
1282impl WeakConnectionHandle {
1283    /// Returns `true` if the [`Connection`] associated with this handle is still alive.
1284    pub fn is_alive(&self) -> bool {
1285        self.0.upgrade().is_some()
1286    }
1287
1288    /// Upgrade the handle to a full `Connection`
1289    pub fn upgrade(&self) -> Option<Connection> {
1290        self.0
1291            .upgrade()
1292            .map(|inner| Connection(ConnectionRef::from_arc(inner)))
1293    }
1294}
1295
1296#[derive(Debug, Default)]
1297pub(crate) struct Shared {
1298    handshake_confirmed: Notify,
1299    /// Notified when new streams may be locally initiated due to an increase in stream ID flow
1300    /// control budget
1301    stream_budget_available: [Notify; 2],
1302    /// Notified when the peer has initiated a new stream
1303    stream_incoming: [Notify; 2],
1304    datagram_received: Notify,
1305    datagrams_unblocked: Notify,
1306    closed: Notify,
1307}
1308
1309pub(crate) struct State {
1310    pub(crate) inner: proto::Connection,
1311    driver: Option<Waker>,
1312    handle: ConnectionHandle,
1313    on_handshake_data: Option<oneshot::Sender<()>>,
1314    on_connected: Option<oneshot::Sender<bool>>,
1315    connected: bool,
1316    handshake_confirmed: bool,
1317    timer: Option<Pin<Box<dyn AsyncTimer>>>,
1318    timer_deadline: Option<Instant>,
1319    conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1320    endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1321    pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
1322    pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
1323    pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
1324    /// Always set to Some before the connection becomes drained
1325    pub(crate) error: Option<ConnectionError>,
1326    /// Tracks paths being opened
1327    open_path: FxHashMap<PathId, watch::Sender<Result<(), PathError>>>,
1328    /// Tracks paths being closed
1329    pub(crate) close_path: FxHashMap<PathId, oneshot::Sender<VarInt>>,
1330    pub(crate) path_events: tokio::sync::broadcast::Sender<PathEvent>,
1331    /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
1332    ref_count: usize,
1333    sender: Pin<Box<dyn UdpSender>>,
1334    pub(crate) runtime: Arc<dyn Runtime>,
1335    send_buffer: Vec<u8>,
1336    /// We buffer a transmit when the underlying I/O would block
1337    buffered_transmit: Option<proto::Transmit>,
1338    /// Our last external address reported by the peer. When multipath is enabled, this will be the
1339    /// last report across all paths.
1340    pub(crate) observed_external_addr: watch::Sender<Option<SocketAddr>>,
1341    pub(crate) nat_traversal_updates: tokio::sync::broadcast::Sender<iroh_hp::Event>,
1342    on_closed: Vec<oneshot::Sender<(ConnectionError, ConnectionStats)>>,
1343}
1344
1345impl State {
1346    #[allow(clippy::too_many_arguments)]
1347    fn new(
1348        inner: proto::Connection,
1349        handle: ConnectionHandle,
1350        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
1351        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
1352        on_handshake_data: oneshot::Sender<()>,
1353        on_connected: oneshot::Sender<bool>,
1354        sender: Pin<Box<dyn UdpSender>>,
1355        runtime: Arc<dyn Runtime>,
1356    ) -> Self {
1357        Self {
1358            inner,
1359            driver: None,
1360            handle,
1361            on_handshake_data: Some(on_handshake_data),
1362            on_connected: Some(on_connected),
1363            connected: false,
1364            handshake_confirmed: false,
1365            timer: None,
1366            timer_deadline: None,
1367            conn_events,
1368            endpoint_events,
1369            blocked_writers: FxHashMap::default(),
1370            blocked_readers: FxHashMap::default(),
1371            stopped: FxHashMap::default(),
1372            open_path: FxHashMap::default(),
1373            close_path: FxHashMap::default(),
1374            error: None,
1375            ref_count: 0,
1376            sender,
1377            runtime,
1378            send_buffer: Vec::new(),
1379            buffered_transmit: None,
1380            path_events: tokio::sync::broadcast::channel(32).0,
1381            observed_external_addr: watch::Sender::new(None),
1382            nat_traversal_updates: tokio::sync::broadcast::channel(32).0,
1383            on_closed: Vec::new(),
1384        }
1385    }
1386
1387    fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
1388        let now = self.runtime.now();
1389        let mut transmits = 0;
1390
1391        let max_datagrams = self
1392            .sender
1393            .max_transmit_segments()
1394            .min(MAX_TRANSMIT_SEGMENTS);
1395
1396        loop {
1397            // Retry the last transmit, or get a new one.
1398            let t = match self.buffered_transmit.take() {
1399                Some(t) => t,
1400                None => {
1401                    self.send_buffer.clear();
1402                    match self
1403                        .inner
1404                        .poll_transmit(now, max_datagrams, &mut self.send_buffer)
1405                    {
1406                        Some(t) => {
1407                            transmits += match t.segment_size {
1408                                None => 1,
1409                                Some(s) => t.size.div_ceil(s), // round up
1410                            };
1411                            t
1412                        }
1413                        None => break,
1414                    }
1415                }
1416            };
1417
1418            let len = t.size;
1419            match self
1420                .sender
1421                .as_mut()
1422                .poll_send(&udp_transmit(&t, &self.send_buffer[..len]), cx)
1423            {
1424                Poll::Pending => {
1425                    self.buffered_transmit = Some(t);
1426                    return Ok(false);
1427                }
1428                Poll::Ready(Err(e)) => return Err(e),
1429                Poll::Ready(Ok(())) => {}
1430            }
1431
1432            if transmits >= MAX_TRANSMIT_DATAGRAMS {
1433                // TODO: What isn't ideal here yet is that if we don't poll all
1434                // datagrams that could be sent we don't go into the `app_limited`
1435                // state and CWND continues to grow until we get here the next time.
1436                // See https://github.com/quinn-rs/quinn/issues/1126
1437                return Ok(true);
1438            }
1439        }
1440
1441        Ok(false)
1442    }
1443
1444    fn forward_endpoint_events(&mut self) {
1445        while let Some(event) = self.inner.poll_endpoint_events() {
1446            // If the endpoint driver is gone, noop.
1447            let _ = self.endpoint_events.send((self.handle, event));
1448        }
1449    }
1450
1451    /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1452    fn process_conn_events(
1453        &mut self,
1454        shared: &Shared,
1455        cx: &mut Context,
1456    ) -> Result<(), ConnectionError> {
1457        loop {
1458            match self.conn_events.poll_recv(cx) {
1459                Poll::Ready(Some(ConnectionEvent::Rebind(sender))) => {
1460                    self.sender = sender;
1461                    self.inner.local_address_changed();
1462                }
1463                Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1464                    self.inner.handle_event(event);
1465                }
1466                Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1467                    self.close(error_code, reason, shared);
1468                }
1469                Poll::Ready(None) => {
1470                    return Err(ConnectionError::TransportError(TransportError::new(
1471                        TransportErrorCode::INTERNAL_ERROR,
1472                        "endpoint driver future was dropped".to_string(),
1473                    )));
1474                }
1475                Poll::Pending => {
1476                    return Ok(());
1477                }
1478            }
1479        }
1480    }
1481
1482    fn forward_app_events(&mut self, shared: &Shared) {
1483        while let Some(event) = self.inner.poll() {
1484            use proto::Event::*;
1485            match event {
1486                HandshakeDataReady => {
1487                    if let Some(x) = self.on_handshake_data.take() {
1488                        let _ = x.send(());
1489                    }
1490                }
1491                Connected => {
1492                    self.connected = true;
1493                    if let Some(x) = self.on_connected.take() {
1494                        // We don't care if the on-connected future was dropped
1495                        let _ = x.send(self.inner.accepted_0rtt());
1496                    }
1497                    if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1498                        // Wake up rejected 0-RTT streams so they can fail immediately with
1499                        // `ZeroRttRejected` errors.
1500                        wake_all(&mut self.blocked_writers);
1501                        wake_all(&mut self.blocked_readers);
1502                        wake_all_notify(&mut self.stopped);
1503                    }
1504                }
1505                HandshakeConfirmed => {
1506                    self.handshake_confirmed = true;
1507                    shared.handshake_confirmed.notify_waiters();
1508                }
1509                ConnectionLost { reason } => {
1510                    self.terminate(reason, shared);
1511                }
1512                Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1513                Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1514                    shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1515                }
1516                Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1517                    shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1518                }
1519                DatagramReceived => {
1520                    shared.datagram_received.notify_waiters();
1521                }
1522                DatagramsUnblocked => {
1523                    shared.datagrams_unblocked.notify_waiters();
1524                }
1525                Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1526                Stream(StreamEvent::Available { dir }) => {
1527                    // Might mean any number of streams are ready, so we wake up everyone
1528                    shared.stream_budget_available[dir as usize].notify_waiters();
1529                }
1530                Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
1531                Stream(StreamEvent::Stopped { id, .. }) => {
1532                    wake_stream_notify(id, &mut self.stopped);
1533                    wake_stream(id, &mut self.blocked_writers);
1534                }
1535                Path(ref evt @ PathEvent::ObservedAddr { addr: observed, .. }) => {
1536                    self.path_events.send(evt.clone()).ok();
1537                    self.observed_external_addr.send_if_modified(|addr| {
1538                        let old = addr.replace(observed);
1539                        old != *addr
1540                    });
1541                }
1542                Path(ref evt @ PathEvent::Opened { id }) => {
1543                    self.path_events.send(evt.clone()).ok();
1544                    if let Some(sender) = self.open_path.remove(&id) {
1545                        sender.send_modify(|value| *value = Ok(()));
1546                    }
1547                }
1548                Path(ref evt @ PathEvent::Closed { id, error_code }) => {
1549                    self.path_events.send(evt.clone()).ok();
1550                    if let Some(sender) = self.close_path.remove(&id) {
1551                        let _ = sender.send(error_code);
1552                    }
1553                }
1554                Path(evt @ PathEvent::Abandoned { .. }) => {
1555                    self.path_events.send(evt).ok();
1556                }
1557                Path(ref evt @ PathEvent::LocallyClosed { id, error }) => {
1558                    self.path_events.send(evt.clone()).ok();
1559                    if let Some(sender) = self.open_path.remove(&id) {
1560                        sender.send_modify(|value| *value = Err(error));
1561                    }
1562                    // this will happen also for already opened paths
1563                }
1564                Path(evt @ PathEvent::RemoteStatus { .. }) => {
1565                    self.path_events.send(evt).ok();
1566                }
1567                NatTraversal(update) => {
1568                    self.nat_traversal_updates.send(update).ok();
1569                }
1570            }
1571        }
1572    }
1573
1574    fn drive_timer(&mut self, cx: &mut Context) -> bool {
1575        // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1576        // timer is registered with the runtime (and check whether it's already
1577        // expired).
1578        match self.inner.poll_timeout() {
1579            Some(deadline) => {
1580                if let Some(delay) = &mut self.timer {
1581                    // There is no need to reset the tokio timer if the deadline
1582                    // did not change
1583                    if self
1584                        .timer_deadline
1585                        .map(|current_deadline| current_deadline != deadline)
1586                        .unwrap_or(true)
1587                    {
1588                        delay.as_mut().reset(deadline);
1589                    }
1590                } else {
1591                    self.timer = Some(self.runtime.new_timer(deadline));
1592                }
1593                // Store the actual expiration time of the timer
1594                self.timer_deadline = Some(deadline);
1595            }
1596            None => {
1597                self.timer_deadline = None;
1598                return false;
1599            }
1600        }
1601
1602        if self.timer_deadline.is_none() {
1603            return false;
1604        }
1605
1606        let delay = self
1607            .timer
1608            .as_mut()
1609            .expect("timer must exist in this state")
1610            .as_mut();
1611        if delay.poll(cx).is_pending() {
1612            // Since there wasn't a timeout event, there is nothing new
1613            // for the connection to do
1614            return false;
1615        }
1616
1617        // A timer expired, so the caller needs to check for
1618        // new transmits, which might cause new timers to be set.
1619        self.inner.handle_timeout(self.runtime.now());
1620        self.timer_deadline = None;
1621        true
1622    }
1623
1624    /// Wake up a blocked `Driver` task to process I/O
1625    pub(crate) fn wake(&mut self) {
1626        if let Some(x) = self.driver.take() {
1627            x.wake();
1628        }
1629    }
1630
1631    /// Used to wake up all blocked futures when the connection becomes closed for any reason
1632    fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1633        self.error = Some(reason.clone());
1634        if let Some(x) = self.on_handshake_data.take() {
1635            let _ = x.send(());
1636        }
1637        wake_all(&mut self.blocked_writers);
1638        wake_all(&mut self.blocked_readers);
1639        shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1640        shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1641        shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1642        shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1643        shared.datagram_received.notify_waiters();
1644        shared.datagrams_unblocked.notify_waiters();
1645        if let Some(x) = self.on_connected.take() {
1646            let _ = x.send(false);
1647        }
1648        shared.handshake_confirmed.notify_waiters();
1649        wake_all_notify(&mut self.stopped);
1650        shared.closed.notify_waiters();
1651
1652        // Send to the registered on_closed futures.
1653        let stats = self.inner.stats();
1654        for tx in self.on_closed.drain(..) {
1655            tx.send((reason.clone(), stats.clone())).ok();
1656        }
1657    }
1658
1659    fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1660        self.inner.close(self.runtime.now(), error_code, reason);
1661        self.terminate(ConnectionError::LocallyClosed, shared);
1662        self.wake();
1663    }
1664
1665    /// Close for a reason other than the application's explicit request
1666    pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1667        self.close(0u32.into(), Bytes::new(), shared);
1668    }
1669
1670    pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1671        if self.inner.is_handshaking()
1672            || self.inner.accepted_0rtt()
1673            || self.inner.side().is_server()
1674        {
1675            Ok(())
1676        } else {
1677            Err(())
1678        }
1679    }
1680}
1681
1682impl Drop for State {
1683    fn drop(&mut self) {
1684        if !self.inner.is_drained() {
1685            // Ensure the endpoint can tidy up
1686            let _ = self
1687                .endpoint_events
1688                .send((self.handle, proto::EndpointEvent::drained()));
1689        }
1690
1691        if !self.on_closed.is_empty() {
1692            // Ensure that all on_closed oneshot senders are triggered before dropping.
1693            let reason = self.error.as_ref().expect("closed without error reason");
1694            let stats = self.inner.stats();
1695            for tx in self.on_closed.drain(..) {
1696                tx.send((reason.clone(), stats.clone())).ok();
1697            }
1698        }
1699    }
1700}
1701
1702impl fmt::Debug for State {
1703    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1704        f.debug_struct("State").field("inner", &self.inner).finish()
1705    }
1706}
1707
1708fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1709    if let Some(waker) = wakers.remove(&stream_id) {
1710        waker.wake();
1711    }
1712}
1713
1714fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1715    wakers.drain().for_each(|(_, waker)| waker.wake())
1716}
1717
1718fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1719    if let Some(notify) = wakers.remove(&stream_id) {
1720        notify.notify_waiters()
1721    }
1722}
1723
1724fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
1725    wakers
1726        .drain()
1727        .for_each(|(_, notify)| notify.notify_waiters())
1728}
1729
1730/// Errors that can arise when sending a datagram
1731#[derive(Debug, Error, Clone, Eq, PartialEq)]
1732pub enum SendDatagramError {
1733    /// The peer does not support receiving datagram frames
1734    #[error("datagrams not supported by peer")]
1735    UnsupportedByPeer,
1736    /// Datagram support is disabled locally
1737    #[error("datagram support disabled")]
1738    Disabled,
1739    /// The datagram is larger than the connection can currently accommodate
1740    ///
1741    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1742    /// exceeded.
1743    #[error("datagram too large")]
1744    TooLarge,
1745    /// The connection was lost
1746    #[error("connection lost")]
1747    ConnectionLost(#[from] ConnectionError),
1748}
1749
1750/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1751///
1752/// This limits the amount of CPU resources consumed by datagram generation,
1753/// and allows other tasks (like receiving ACKs) to run in between.
1754const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1755
1756/// The maximum amount of datagrams that are sent in a single transmit
1757///
1758/// This can be lower than the maximum platform capabilities, to avoid excessive
1759/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1760/// that numbers around 10 are a good compromise.
1761const MAX_TRANSMIT_SEGMENTS: NonZeroUsize = NonZeroUsize::new(10).expect("known");