iroh_quinn/
path.rs

1use std::future::Future;
2use std::net::SocketAddr;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll, ready};
6use std::time::Duration;
7
8use proto::{
9    ClosePathError, ClosedPath, ConnectionError, PathError, PathEvent, PathId, PathStatus,
10    SetPathStatusError, TransportErrorCode, VarInt,
11};
12use tokio::sync::{oneshot, watch};
13use tokio_stream::{Stream, wrappers::WatchStream};
14
15use crate::Runtime;
16use crate::connection::ConnectionRef;
17
18/// Future produced by [`crate::Connection::open_path`]
19pub struct OpenPath(OpenPathInner);
20
21enum OpenPathInner {
22    /// Opening a path in underway
23    ///
24    /// This might fail later on.
25    Ongoing {
26        opened: WatchStream<Result<(), PathError>>,
27        path_id: PathId,
28        conn: ConnectionRef,
29    },
30    /// Opening a path failed immediately
31    Rejected {
32        /// The error that occurred
33        err: PathError,
34    },
35    /// The path is already open
36    Ready {
37        path_id: PathId,
38        conn: ConnectionRef,
39    },
40}
41
42impl OpenPath {
43    pub(crate) fn new(
44        path_id: PathId,
45        opened: watch::Receiver<Result<(), PathError>>,
46        conn: ConnectionRef,
47    ) -> Self {
48        Self(OpenPathInner::Ongoing {
49            opened: WatchStream::from_changes(opened),
50            path_id,
51            conn,
52        })
53    }
54
55    pub(crate) fn ready(path_id: PathId, conn: ConnectionRef) -> Self {
56        Self(OpenPathInner::Ready { path_id, conn })
57    }
58
59    pub(crate) fn rejected(err: PathError) -> Self {
60        Self(OpenPathInner::Rejected { err })
61    }
62
63    /// Returns the path ID of the new path being opened.
64    ///
65    /// If an error occurred before a path ID was allocated, `None` is returned.  In this
66    /// case the future is ready and polling it will immediately yield the error.
67    ///
68    /// The returned value remains the same for the entire lifetime of this future.
69    pub fn path_id(&self) -> Option<PathId> {
70        match self.0 {
71            OpenPathInner::Ongoing { path_id, .. } => Some(path_id),
72            OpenPathInner::Rejected { .. } => None,
73            OpenPathInner::Ready { path_id, .. } => Some(path_id),
74        }
75    }
76}
77
78impl Future for OpenPath {
79    type Output = Result<Path, PathError>;
80    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
81        match self.get_mut().0 {
82            OpenPathInner::Ongoing {
83                ref mut opened,
84                path_id,
85                ref mut conn,
86            } => match ready!(Pin::new(opened).poll_next(ctx)) {
87                Some(value) => Poll::Ready(value.map(|_| Path {
88                    id: path_id,
89                    conn: conn.clone(),
90                })),
91                None => {
92                    // This only happens if receiving a notification change failed, this means the
93                    // sender was dropped. This generally should not happen so we use a transient
94                    // error
95                    Poll::Ready(Err(PathError::ValidationFailed))
96                }
97            },
98            OpenPathInner::Ready {
99                path_id,
100                ref mut conn,
101            } => Poll::Ready(Ok(Path {
102                id: path_id,
103                conn: conn.clone(),
104            })),
105            OpenPathInner::Rejected { err } => Poll::Ready(Err(err)),
106        }
107    }
108}
109
110/// An open (Multi)Path
111#[derive(Debug)]
112pub struct Path {
113    pub(crate) id: PathId,
114    pub(crate) conn: ConnectionRef,
115}
116
117impl Path {
118    /// The [`PathId`] of this path.
119    pub fn id(&self) -> PathId {
120        self.id
121    }
122
123    /// The current local [`PathStatus`] of this path.
124    pub fn status(&self) -> Result<PathStatus, ClosedPath> {
125        self.conn
126            .state
127            .lock("path status")
128            .inner
129            .path_status(self.id)
130    }
131
132    /// Sets the [`PathStatus`] of this path.
133    pub fn set_status(&self, status: PathStatus) -> Result<(), SetPathStatusError> {
134        self.conn
135            .state
136            .lock("set path status")
137            .inner
138            .set_path_status(self.id, status)?;
139        Ok(())
140    }
141
142    /// Closes this path
143    ///
144    /// The future will resolve when all the path state is dropped.  This only happens after
145    /// the remote has confirmed the path as closed **and** after an additional timeout to
146    /// give any in-flight packets the time to arrive.
147    pub fn close(&self) -> Result<ClosePath, ClosePathError> {
148        let (on_path_close_send, on_path_close_recv) = oneshot::channel();
149        {
150            let mut state = self.conn.state.lock("close_path");
151            state.inner.close_path(
152                crate::Instant::now(),
153                self.id,
154                TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
155            )?;
156            state.close_path.insert(self.id, on_path_close_send);
157        }
158
159        Ok(ClosePath {
160            closed: on_path_close_recv,
161        })
162    }
163
164    /// Sets the keep_alive_interval for a specific path
165    ///
166    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
167    ///
168    /// Returns the previous value of the setting.
169    ///
170    /// [`TransportConfig::default_path_keep_alive_interval`]: crate::TransportConfig::default_path_keep_alive_interval
171    pub fn set_max_idle_timeout(
172        &self,
173        timeout: Option<Duration>,
174    ) -> Result<Option<Duration>, ClosedPath> {
175        let mut state = self.conn.state.lock("path_set_max_idle_timeout");
176        state.inner.set_path_max_idle_timeout(self.id, timeout)
177    }
178
179    /// Sets the keep_alive_interval for a specific path
180    ///
181    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
182    ///
183    /// Returns the previous value of the setting.
184    ///
185    /// [`TransportConfig::default_path_keep_alive_interval`]: crate::TransportConfig::default_path_keep_alive_interval
186    pub fn set_keep_alive_interval(
187        &self,
188        interval: Option<Duration>,
189    ) -> Result<Option<Duration>, ClosedPath> {
190        let mut state = self.conn.state.lock("path_set_keep_alive_interval");
191        state.inner.set_path_keep_alive_interval(self.id, interval)
192    }
193
194    /// Track changes on our external address as reported by the peer.
195    ///
196    /// If the address-discovery extension is not negotiated, the stream will never return.
197    pub fn observed_external_addr(&self) -> Result<AddressDiscovery, ClosedPath> {
198        let state = self.conn.state.lock("per_path_observed_address");
199        let path_events = state.path_events.subscribe();
200        let initial_value = state.inner.path_observed_address(self.id)?;
201        Ok(AddressDiscovery::new(
202            self.id,
203            path_events,
204            initial_value,
205            state.runtime.clone(),
206        ))
207    }
208
209    /// The peer's UDP address for this path.
210    pub fn remote_address(&self) -> Result<SocketAddr, ClosedPath> {
211        let state = self.conn.state.lock("per_path_remote_address");
212        state.inner.path_remote_address(self.id)
213    }
214}
215
216/// Future produced by [`Path::close`]
217pub struct ClosePath {
218    closed: oneshot::Receiver<VarInt>,
219}
220
221impl Future for ClosePath {
222    type Output = Result<VarInt, ConnectionError>;
223    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
224        // TODO: thread through errors
225        let res = ready!(Pin::new(&mut self.closed).poll(ctx));
226        match res {
227            Ok(code) => Poll::Ready(Ok(code)),
228            Err(_err) => todo!(), // TODO: appropriate error
229        }
230    }
231}
232
233/// Stream produced by [`Path::observed_external_addr`]
234///
235/// This will always return the external address most recently reported by the remote over this
236/// path. If the extension is not negotiated, this stream will never return.
237// TODO(@divma): provide a way to check if the extension is negotiated.
238pub struct AddressDiscovery {
239    watcher: WatchStream<SocketAddr>,
240}
241
242impl AddressDiscovery {
243    pub(super) fn new(
244        path_id: PathId,
245        mut path_events: tokio::sync::broadcast::Receiver<PathEvent>,
246        initial_value: Option<SocketAddr>,
247        runtime: Arc<dyn Runtime>,
248    ) -> Self {
249        let (tx, rx) = watch::channel(initial_value.unwrap_or_else(||
250                // if the dummy value is used, it will be ignored
251                SocketAddr::new([0, 0, 0, 0].into(), 0)));
252        let filter = async move {
253            loop {
254                match path_events.recv().await {
255                    Ok(PathEvent::ObservedAddr { id, addr: observed }) if id == path_id => {
256                        tx.send_if_modified(|addr| {
257                            let old = std::mem::replace(addr, observed);
258                            old != *addr
259                        });
260                    }
261                    Ok(PathEvent::Abandoned { id, .. }) if id == path_id => {
262                        // If the path is closed, terminate the stream
263                        break;
264                    }
265                    Ok(_) => {
266                        // ignore any other event
267                    }
268                    Err(_) => {
269                        // A lagged error should never happen since this (detached) task is
270                        // constantly reading from the channel. Therefore, if an error does happen,
271                        // the stream can terminate
272                        break;
273                    }
274                }
275            }
276        };
277
278        let watcher = if initial_value.is_some() {
279            WatchStream::new(rx)
280        } else {
281            WatchStream::from_changes(rx)
282        };
283
284        runtime.spawn(Box::pin(filter));
285        // TODO(@divma): check if there's a way to ensure the future ends. AbortHandle is not an
286        // option
287        Self { watcher }
288    }
289}
290
291impl Stream for AddressDiscovery {
292    type Item = SocketAddr;
293
294    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
295        Pin::new(&mut self.watcher).poll_next(cx)
296    }
297}