noq/
path.rs

1use std::future::Future;
2use std::net::{IpAddr, SocketAddr};
3use std::pin::Pin;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::task::{Context, Poll, ready};
7use std::time::Duration;
8
9use proto::{
10    ClosePathError, ClosedPath, FourTuple, PathError, PathEvent, PathId, PathStats, PathStatus,
11    SetPathStatusError, TransportErrorCode,
12};
13use tokio::sync::watch;
14use tokio_stream::{Stream, wrappers::WatchStream};
15
16use crate::connection::ConnectionRef;
17use crate::{Runtime, WeakConnectionHandle};
18
19/// Future produced by [`crate::Connection::open_path`]
20pub struct OpenPath(OpenPathInner);
21
22enum OpenPathInner {
23    /// Opening a path in underway
24    ///
25    /// This might fail later on.
26    Ongoing {
27        opened: WatchStream<Result<(), PathError>>,
28        path_id: PathId,
29        conn: ConnectionRef,
30    },
31    /// Opening a path failed immediately
32    Rejected {
33        /// The error that occurred
34        err: PathError,
35    },
36    /// The path is already open
37    Ready {
38        path_id: PathId,
39        conn: ConnectionRef,
40    },
41}
42
43impl OpenPath {
44    pub(crate) fn new(
45        path_id: PathId,
46        opened: watch::Receiver<Result<(), PathError>>,
47        conn: ConnectionRef,
48    ) -> Self {
49        Self(OpenPathInner::Ongoing {
50            opened: WatchStream::from_changes(opened),
51            path_id,
52            conn,
53        })
54    }
55
56    pub(crate) fn ready(path_id: PathId, conn: ConnectionRef) -> Self {
57        Self(OpenPathInner::Ready { path_id, conn })
58    }
59
60    pub(crate) fn rejected(err: PathError) -> Self {
61        Self(OpenPathInner::Rejected { err })
62    }
63
64    /// Returns the path ID of the new path being opened.
65    ///
66    /// If an error occurred before a path ID was allocated, `None` is returned.  In this
67    /// case the future is ready and polling it will immediately yield the error.
68    ///
69    /// The returned value remains the same for the entire lifetime of this future.
70    pub fn path_id(&self) -> Option<PathId> {
71        match self.0 {
72            OpenPathInner::Ongoing { path_id, .. } => Some(path_id),
73            OpenPathInner::Rejected { .. } => None,
74            OpenPathInner::Ready { path_id, .. } => Some(path_id),
75        }
76    }
77}
78
79impl Future for OpenPath {
80    type Output = Result<Path, PathError>;
81    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
82        match self.get_mut().0 {
83            OpenPathInner::Ongoing {
84                ref mut opened,
85                path_id,
86                ref mut conn,
87            } => match ready!(Pin::new(opened).poll_next(ctx)) {
88                Some(value) => {
89                    Poll::Ready(value.map(|_| Path::new_unchecked(conn.clone(), path_id)))
90                }
91                None => {
92                    // This only happens if receiving a notification change failed, this means the
93                    // sender was dropped. This generally should not happen so we use a transient
94                    // error
95                    Poll::Ready(Err(PathError::ValidationFailed))
96                }
97            },
98            OpenPathInner::Ready {
99                path_id,
100                ref mut conn,
101            } => Poll::Ready(Ok(Path::new_unchecked(conn.clone(), path_id))),
102            OpenPathInner::Rejected { err } => Poll::Ready(Err(err)),
103        }
104    }
105}
106
107/// An open network transmission within a multipath-enabled connection.
108///
109/// As long as a [`Path`] or [`WeakPathHandle`] is alive, it is ensured that the
110/// [`PathStats`] for this path are not dropped even after the path is abandoned.
111///
112/// [`WeakPathHandle`]: crate::path::WeakPathHandle
113#[derive(Debug, Clone)]
114pub struct Path {
115    conn: ConnectionRef,
116    path_ref: PathRef,
117}
118
119impl Path {
120    /// Returns a [`Path`] for a path id, after checking that the path is not closed.
121    pub(crate) fn new(conn: &ConnectionRef, id: PathId) -> Option<Self> {
122        let path_ref = {
123            let mut state = conn.lock_without_waking("Path::new");
124            // TODO(flub): Using this to know if the path still exists is... hacky.
125            state.inner.path_status(id).ok()?;
126            state.acquire_path_ref(id)
127        };
128        Some(Self {
129            conn: conn.clone(),
130            path_ref,
131        })
132    }
133
134    /// Returns a [`Path`] for a path id without checking if the path exists or is closed.
135    fn new_unchecked(conn: ConnectionRef, id: PathId) -> Self {
136        let path_ref = conn
137            .lock_without_waking("Path::new_unchecked")
138            .acquire_path_ref(id);
139        Self { conn, path_ref }
140    }
141
142    /// Returns a [`WeakPathHandle`] for this path.
143    ///
144    /// Holding a [`WeakPathHandle`] does not keep a connection alive, but ensures that the
145    /// path's stats are not dropped until the underlying connection is dropped, even if the
146    /// path is abandoned.
147    pub fn weak_handle(&self) -> WeakPathHandle {
148        WeakPathHandle {
149            path_ref: self.path_ref.clone(),
150            conn: self.conn.weak_handle(),
151        }
152    }
153
154    /// The [`PathId`] of this path.
155    pub fn id(&self) -> PathId {
156        self.path_ref.id
157    }
158
159    /// The current local [`PathStatus`] of this path.
160    pub fn status(&self) -> Result<PathStatus, ClosedPath> {
161        self.conn
162            .lock_without_waking("path status")
163            .inner
164            .path_status(self.id())
165    }
166
167    /// Sets the [`PathStatus`] of this path.
168    ///
169    /// Returns the previous status of the path.
170    pub fn set_status(&self, status: PathStatus) -> Result<PathStatus, SetPathStatusError> {
171        self.conn
172            .lock_and_wake("set path status")
173            .inner
174            .set_path_status(self.id(), status)
175    }
176
177    /// Returns the [`PathStats`] for this path.
178    pub fn stats(&self) -> PathStats {
179        // The `expect` is safe:
180        // - `Path` can only be created for non-closed paths.
181        // - `Path` and its clones or `WeakPathHandle`s all increment the connection state's `path_ref`
182        //   reference counter
183        // - As long as a path is not abandoned, its stats are available from `proto::Connection`
184        // - If a path is abandoned, the `crate::Connection` stores the final stats as long as
185        //   the path's refcount is not 0
186        // - Therefore, we always get stats here.
187        self.conn
188            .lock_without_waking("Path::stats")
189            .path_stats(self.id())
190            .expect("either path stats or discarded path stats are always set as long as Path is not dropped")
191    }
192
193    /// Closes this path.
194    ///
195    /// The path is immediately considered closed by the local endpoint. Once the state is removed,
196    /// after a short period of time for any in-flight packets, a [`PathEvent::Abandoned`] is
197    /// returned.
198    pub fn close(&self) -> Result<(), ClosePathError> {
199        let mut state = self.conn.lock_and_wake("close_path");
200        state.inner.close_path(
201            crate::Instant::now(),
202            self.id(),
203            TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
204        )
205    }
206
207    /// Sets the max idle timeout for a specific path
208    ///
209    /// See [`TransportConfig::default_path_max_idle_timeout`] for details.
210    ///
211    /// Returns the previous value of the setting.
212    ///
213    /// [`TransportConfig::default_path_max_idle_timeout`]: crate::TransportConfig::default_path_max_idle_timeout
214    pub fn set_max_idle_timeout(
215        &self,
216        timeout: Option<Duration>,
217    ) -> Result<Option<Duration>, ClosedPath> {
218        let mut state = self.conn.lock_and_wake("path_set_max_idle_timeout");
219        let now = state.runtime.now();
220        state
221            .inner
222            .set_path_max_idle_timeout(now, self.id(), timeout)
223    }
224
225    /// Sets the keep_alive_interval for a specific path
226    ///
227    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
228    ///
229    /// Returns the previous value of the setting.
230    ///
231    /// [`TransportConfig::default_path_keep_alive_interval`]: crate::TransportConfig::default_path_keep_alive_interval
232    pub fn set_keep_alive_interval(
233        &self,
234        interval: Option<Duration>,
235    ) -> Result<Option<Duration>, ClosedPath> {
236        let mut state = self.conn.lock_and_wake("path_set_keep_alive_interval");
237        state
238            .inner
239            .set_path_keep_alive_interval(self.id(), interval)
240    }
241
242    /// Track changes on our external address as reported by the peer.
243    ///
244    /// If the address-discovery extension is not negotiated, the stream will never return.
245    pub fn observed_external_addr(&self) -> Result<AddressDiscovery, ClosedPath> {
246        let state = self.conn.lock_without_waking("per_path_observed_address");
247        let path_events = state.path_events.subscribe();
248        let initial_value = state.inner.path_observed_address(self.id())?;
249        Ok(AddressDiscovery::new(
250            self.id(),
251            path_events,
252            initial_value,
253            state.runtime.clone(),
254        ))
255    }
256
257    /// The peer's UDP address for this path.
258    pub fn remote_address(&self) -> Result<SocketAddr, ClosedPath> {
259        let state = self.conn.lock_without_waking("per_path_remote_address");
260        Ok(state.inner.network_path(self.id())?.remote())
261    }
262
263    /// The local IP used for this path, if known.
264    ///
265    /// Returns `Ok(None)` for clients or when the platform does not expose this information; see
266    /// [`noq_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for supported platforms.
267    pub fn local_ip(&self) -> Result<Option<IpAddr>, ClosedPath> {
268        let state = self.conn.lock_without_waking("per_path_local_ip");
269        Ok(state.inner.network_path(self.id())?.local_ip())
270    }
271
272    /// The network path used for this path.
273    ///
274    /// Returns a [`FourTuple`], combining [`Self::remote_address`] and [`Self::local_ip`].
275    pub fn network_path(&self) -> Result<FourTuple, ClosedPath> {
276        let state = self.conn.lock_without_waking("per_path_local_ip");
277        state.inner.network_path(self.id())
278    }
279
280    /// Ping the remote endpoint over this path.
281    pub fn ping(&self) -> Result<(), ClosedPath> {
282        let mut state = self.conn.lock_and_wake("ping");
283        state.inner.ping_path(self.id())
284    }
285}
286
287impl Drop for Path {
288    fn drop(&mut self) {
289        self.path_ref.on_drop(&self.conn);
290    }
291}
292
293impl PartialEq for Path {
294    fn eq(&self, other: &Self) -> bool {
295        self.id() == other.id() && self.conn.stable_id() == other.conn.stable_id()
296    }
297}
298
299/// Weak handle for a [`Path`] that does not keep the connection alive.
300///
301/// As long as a [`WeakPathHandle`] for a path exists, that path's final stats will not be dropped even if
302/// the path was abandoned.
303///
304/// The [`WeakPathHandle`] can be upgraded to a [`Path`] as long as its [`Connection`] has not been dropped.
305///
306/// [`Connection`]: crate::Connection
307#[derive(Debug, Clone)]
308pub struct WeakPathHandle {
309    conn: WeakConnectionHandle,
310    path_ref: PathRef,
311}
312
313impl PartialEq for WeakPathHandle {
314    fn eq(&self, other: &Self) -> bool {
315        self.id() == other.id() && self.conn.is_same_connection(&other.conn)
316    }
317}
318
319impl Eq for WeakPathHandle {}
320
321impl WeakPathHandle {
322    /// Returns the [`PathId`] of this path.
323    pub fn id(&self) -> PathId {
324        self.path_ref.id
325    }
326
327    /// Upgrades to a [`Path`].
328    ///
329    /// Returns `None` if the connection was dropped.
330    pub fn upgrade(&self) -> Option<Path> {
331        let conn = self.conn.upgrade_to_ref()?;
332        Some(Path {
333            conn,
334            path_ref: self.path_ref.clone(),
335        })
336    }
337}
338
339impl Drop for WeakPathHandle {
340    fn drop(&mut self) {
341        if let Some(conn) = self.conn.upgrade_to_ref() {
342            self.path_ref.on_drop(&conn);
343        }
344    }
345}
346
347/// Owner side of a path's reference counter, stored in [`State::path_refs`].
348///
349/// Holds the shared [`AtomicUsize`] but does not itself contribute to the count.
350/// Hands out [`PathRef`] handles via [`Self::acquire`].
351///
352/// [`State::path_refs`]: crate::connection::State::path_refs
353#[derive(Debug, Default)]
354pub(crate) struct PathRefOwner {
355    ref_count: Arc<AtomicUsize>,
356}
357
358impl PathRefOwner {
359    /// Acquire a new [`PathRef`] handle, bumping the reference counter by 1.
360    pub(crate) fn acquire(&self, path_id: PathId) -> PathRef {
361        self.ref_count.fetch_add(1, Ordering::Relaxed);
362        PathRef {
363            id: path_id,
364            ref_count: self.ref_count.clone(),
365        }
366    }
367}
368
369/// Handle side of a path's reference counter, held by [`Path`] and [`WeakPathHandle`].
370///
371/// Cloning bumps the counter automatically. When dropping a `PathRef`, holders must call
372/// [`Self::on_drop`] to decrement the counter and clear the corresponding entries
373/// from the connection state if the counter reaches zero.
374///
375/// [`WeakPathHandle`]: crate::path::WeakPathHandle
376#[derive(Debug)]
377pub(crate) struct PathRef {
378    id: PathId,
379    ref_count: Arc<AtomicUsize>,
380}
381
382impl Clone for PathRef {
383    fn clone(&self) -> Self {
384        self.ref_count.fetch_add(1, Ordering::Relaxed);
385        Self {
386            id: self.id,
387            ref_count: self.ref_count.clone(),
388        }
389    }
390}
391
392impl PathRef {
393    /// Decreases the refcount and clears state once the counter reaches zero.
394    ///
395    /// This must be called before dropping a [`PathRef`], i.e. in the [`Drop`] impl
396    /// of its holder.
397    fn on_drop(&self, conn: &ConnectionRef) {
398        if self.ref_count.fetch_sub(1, Ordering::Relaxed) > 1 {
399            return;
400        }
401        let mut state = conn.lock_without_waking("PathRef::drop");
402        // Re-check under the lock: a concurrent `Path::new` may have bumped
403        // the counter back up between our `fetch_sub` and the lock.
404        if self.ref_count.load(Ordering::Relaxed) > 0 {
405            return;
406        }
407        state.path_refs.remove(&self.id);
408        state.final_path_stats.remove(&self.id);
409    }
410}
411
412/// Stream produced by [`Path::observed_external_addr`]
413///
414/// This will always return the external address most recently reported by the remote over this
415/// path. If the extension is not negotiated, this stream will never return.
416// TODO(@divma): provide a way to check if the extension is negotiated.
417pub struct AddressDiscovery {
418    watcher: WatchStream<SocketAddr>,
419}
420
421impl AddressDiscovery {
422    pub(super) fn new(
423        path_id: PathId,
424        mut path_events: tokio::sync::broadcast::Receiver<PathEvent>,
425        initial_value: Option<SocketAddr>,
426        runtime: Arc<dyn Runtime>,
427    ) -> Self {
428        let (tx, rx) = watch::channel(initial_value.unwrap_or_else(||
429                // if the dummy value is used, it will be ignored
430                SocketAddr::new([0, 0, 0, 0].into(), 0)));
431        let filter = async move {
432            loop {
433                match path_events.recv().await {
434                    Ok(PathEvent::ObservedAddr {
435                        id, addr: observed, ..
436                    }) if id == path_id => {
437                        tx.send_if_modified(|addr| {
438                            let old = std::mem::replace(addr, observed);
439                            old != *addr
440                        });
441                    }
442                    Ok(PathEvent::Discarded { id, .. }) if id == path_id => {
443                        // If the path is closed, terminate the stream
444                        break;
445                    }
446                    Ok(_) => {
447                        // ignore any other event
448                    }
449                    Err(_) => {
450                        // A lagged error should never happen since this (detached) task is
451                        // constantly reading from the channel. Therefore, if an error does happen,
452                        // the stream can terminate
453                        break;
454                    }
455                }
456            }
457        };
458
459        let watcher = if initial_value.is_some() {
460            WatchStream::new(rx)
461        } else {
462            WatchStream::from_changes(rx)
463        };
464
465        runtime.spawn(Box::pin(filter));
466        // TODO(@divma): check if there's a way to ensure the future ends. AbortHandle is not an
467        // option
468        Self { watcher }
469    }
470}
471
472impl Stream for AddressDiscovery {
473    type Item = SocketAddr;
474
475    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
476        Pin::new(&mut self.watcher).poll_next(cx)
477    }
478}