iroh_quinn/
path.rs

1use std::future::Future;
2use std::net::SocketAddr;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll, ready};
6use std::time::Duration;
7
8use proto::{
9    ClosePathError, ClosedPath, ConnectionError, PathError, PathEvent, PathId, PathStatus,
10    SetPathStatusError, TransportErrorCode, VarInt,
11};
12use tokio::sync::{oneshot, watch};
13use tokio_stream::{Stream, wrappers::WatchStream};
14
15use crate::Runtime;
16use crate::connection::ConnectionRef;
17
18/// Future produced by [`crate::Connection::open_path`]
19pub struct OpenPath(OpenPathInner);
20
21enum OpenPathInner {
22    /// Opening a path in underway
23    ///
24    /// This might fail later on.
25    Ongoing {
26        opened: WatchStream<Result<(), PathError>>,
27        path_id: PathId,
28        conn: ConnectionRef,
29    },
30    /// Opening a path failed immediately
31    Rejected {
32        /// The error that occurred
33        err: PathError,
34    },
35    /// The path is already open
36    Ready {
37        path_id: PathId,
38        conn: ConnectionRef,
39    },
40}
41
42impl OpenPath {
43    pub(crate) fn new(
44        path_id: PathId,
45        opened: watch::Receiver<Result<(), PathError>>,
46        conn: ConnectionRef,
47    ) -> Self {
48        Self(OpenPathInner::Ongoing {
49            opened: WatchStream::from_changes(opened),
50            path_id,
51            conn,
52        })
53    }
54
55    pub(crate) fn ready(path_id: PathId, conn: ConnectionRef) -> Self {
56        Self(OpenPathInner::Ready { path_id, conn })
57    }
58
59    pub(crate) fn rejected(err: PathError) -> Self {
60        Self(OpenPathInner::Rejected { err })
61    }
62
63    /// Returns the path ID of the new path being opened.
64    ///
65    /// If an error occurred before a path ID was allocated, `None` is returned.  In this
66    /// case the future is ready and polling it will immediately yield the error.
67    ///
68    /// The returned value remains the same for the entire lifetime of this future.
69    pub fn path_id(&self) -> Option<PathId> {
70        match self.0 {
71            OpenPathInner::Ongoing { path_id, .. } => Some(path_id),
72            OpenPathInner::Rejected { .. } => None,
73            OpenPathInner::Ready { path_id, .. } => Some(path_id),
74        }
75    }
76}
77
78impl Future for OpenPath {
79    type Output = Result<Path, PathError>;
80    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
81        match self.get_mut().0 {
82            OpenPathInner::Ongoing {
83                ref mut opened,
84                path_id,
85                ref mut conn,
86            } => match ready!(Pin::new(opened).poll_next(ctx)) {
87                Some(value) => Poll::Ready(value.map(|_| Path {
88                    id: path_id,
89                    conn: conn.clone(),
90                })),
91                None => {
92                    // This only happens if receiving a notification change failed, this means the
93                    // sender was dropped. This generally should not happen so we use a transient
94                    // error
95                    Poll::Ready(Err(PathError::ValidationFailed))
96                }
97            },
98            OpenPathInner::Ready {
99                path_id,
100                ref mut conn,
101            } => Poll::Ready(Ok(Path {
102                id: path_id,
103                conn: conn.clone(),
104            })),
105            OpenPathInner::Rejected { err } => Poll::Ready(Err(err)),
106        }
107    }
108}
109
110/// An open (Multi)Path
111#[derive(Debug)]
112pub struct Path {
113    pub(crate) id: PathId,
114    pub(crate) conn: ConnectionRef,
115}
116
117impl Path {
118    /// The [`PathId`] of this path.
119    pub fn id(&self) -> PathId {
120        self.id
121    }
122
123    /// The current local [`PathStatus`] of this path.
124    pub fn status(&self) -> Result<PathStatus, ClosedPath> {
125        self.conn
126            .state
127            .lock("path status")
128            .inner
129            .path_status(self.id)
130    }
131
132    /// Sets the [`PathStatus`] of this path.
133    pub fn set_status(&self, status: PathStatus) -> Result<(), SetPathStatusError> {
134        self.conn
135            .state
136            .lock("set path status")
137            .inner
138            .set_path_status(self.id, status)?;
139        Ok(())
140    }
141
142    /// Closes this path
143    ///
144    /// The future will resolve when all the path state is dropped.  This only happens after
145    /// the remote has confirmed the path as closed **and** after an additional timeout to
146    /// give any in-flight packets the time to arrive.
147    pub fn close(&self) -> Result<ClosePath, ClosePathError> {
148        let (on_path_close_send, on_path_close_recv) = oneshot::channel();
149        {
150            let mut state = self.conn.state.lock("close_path");
151            state.inner.close_path(
152                crate::Instant::now(),
153                self.id,
154                TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
155            )?;
156            state.close_path.insert(self.id, on_path_close_send);
157        }
158
159        Ok(ClosePath {
160            closed: on_path_close_recv,
161        })
162    }
163
164    /// Sets the keep_alive_interval for a specific path
165    ///
166    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
167    ///
168    /// Returns the previous value of the setting.
169    ///
170    /// [`TransportConfig::default_path_keep_alive_interval`]: crate::TransportConfig::default_path_keep_alive_interval
171    pub fn set_max_idle_timeout(
172        &self,
173        timeout: Option<Duration>,
174    ) -> Result<Option<Duration>, ClosedPath> {
175        let mut state = self.conn.state.lock("path_set_max_idle_timeout");
176        state.inner.set_path_max_idle_timeout(self.id, timeout)
177    }
178
179    /// Sets the keep_alive_interval for a specific path
180    ///
181    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
182    ///
183    /// Returns the previous value of the setting.
184    ///
185    /// [`TransportConfig::default_path_keep_alive_interval`]: crate::TransportConfig::default_path_keep_alive_interval
186    pub fn set_keep_alive_interval(
187        &self,
188        interval: Option<Duration>,
189    ) -> Result<Option<Duration>, ClosedPath> {
190        let mut state = self.conn.state.lock("path_set_keep_alive_interval");
191        state.inner.set_path_keep_alive_interval(self.id, interval)
192    }
193
194    /// Track changes on our external address as reported by the peer.
195    ///
196    /// If the address-discovery extension is not negotiated, the stream will never return.
197    pub fn observed_external_addr(&self) -> Result<AddressDiscovery, ClosedPath> {
198        let state = self.conn.state.lock("per_path_observed_address");
199        let path_events = state.path_events.subscribe();
200        let initial_value = state.inner.path_observed_address(self.id)?;
201        Ok(AddressDiscovery::new(
202            self.id,
203            path_events,
204            initial_value,
205            state.runtime.clone(),
206        ))
207    }
208
209    /// The peer's UDP address for this path.
210    pub fn remote_address(&self) -> Result<SocketAddr, ClosedPath> {
211        let state = self.conn.state.lock("per_path_remote_address");
212        Ok(state.inner.network_path(self.id)?.remote)
213    }
214
215    /// Ping the remote endpoint over this path.
216    pub fn ping(&self) -> Result<(), ClosedPath> {
217        let mut state = self.conn.state.lock("ping");
218        state.inner.ping_path(self.id)
219    }
220}
221
222/// Future produced by [`Path::close`]
223pub struct ClosePath {
224    closed: oneshot::Receiver<VarInt>,
225}
226
227impl Future for ClosePath {
228    type Output = Result<VarInt, ConnectionError>;
229    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
230        // TODO: thread through errors
231        let res = ready!(Pin::new(&mut self.closed).poll(ctx));
232        match res {
233            Ok(code) => Poll::Ready(Ok(code)),
234            Err(_err) => todo!(), // TODO: appropriate error
235        }
236    }
237}
238
239/// Stream produced by [`Path::observed_external_addr`]
240///
241/// This will always return the external address most recently reported by the remote over this
242/// path. If the extension is not negotiated, this stream will never return.
243// TODO(@divma): provide a way to check if the extension is negotiated.
244pub struct AddressDiscovery {
245    watcher: WatchStream<SocketAddr>,
246}
247
248impl AddressDiscovery {
249    pub(super) fn new(
250        path_id: PathId,
251        mut path_events: tokio::sync::broadcast::Receiver<PathEvent>,
252        initial_value: Option<SocketAddr>,
253        runtime: Arc<dyn Runtime>,
254    ) -> Self {
255        let (tx, rx) = watch::channel(initial_value.unwrap_or_else(||
256                // if the dummy value is used, it will be ignored
257                SocketAddr::new([0, 0, 0, 0].into(), 0)));
258        let filter = async move {
259            loop {
260                match path_events.recv().await {
261                    Ok(PathEvent::ObservedAddr { id, addr: observed }) if id == path_id => {
262                        tx.send_if_modified(|addr| {
263                            let old = std::mem::replace(addr, observed);
264                            old != *addr
265                        });
266                    }
267                    Ok(PathEvent::Abandoned { id, .. }) if id == path_id => {
268                        // If the path is closed, terminate the stream
269                        break;
270                    }
271                    Ok(_) => {
272                        // ignore any other event
273                    }
274                    Err(_) => {
275                        // A lagged error should never happen since this (detached) task is
276                        // constantly reading from the channel. Therefore, if an error does happen,
277                        // the stream can terminate
278                        break;
279                    }
280                }
281            }
282        };
283
284        let watcher = if initial_value.is_some() {
285            WatchStream::new(rx)
286        } else {
287            WatchStream::from_changes(rx)
288        };
289
290        runtime.spawn(Box::pin(filter));
291        // TODO(@divma): check if there's a way to ensure the future ends. AbortHandle is not an
292        // option
293        Self { watcher }
294    }
295}
296
297impl Stream for AddressDiscovery {
298    type Item = SocketAddr;
299
300    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
301        Pin::new(&mut self.watcher).poll_next(cx)
302    }
303}