1use std::future::Future;
2use std::net::SocketAddr;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll, ready};
6use std::time::Duration;
7
8use proto::{
9 ClosePathError, ClosedPath, ConnectionError, PathError, PathEvent, PathId, PathStatus,
10 SetPathStatusError, TransportErrorCode, VarInt,
11};
12use tokio::sync::{oneshot, watch};
13use tokio_stream::{Stream, wrappers::WatchStream};
14
15use crate::Runtime;
16use crate::connection::ConnectionRef;
17
18pub struct OpenPath(OpenPathInner);
20
21enum OpenPathInner {
22 Ongoing {
26 opened: WatchStream<Result<(), PathError>>,
27 path_id: PathId,
28 conn: ConnectionRef,
29 },
30 Rejected {
32 err: PathError,
34 },
35 Ready {
37 path_id: PathId,
38 conn: ConnectionRef,
39 },
40}
41
42impl OpenPath {
43 pub(crate) fn new(
44 path_id: PathId,
45 opened: watch::Receiver<Result<(), PathError>>,
46 conn: ConnectionRef,
47 ) -> Self {
48 Self(OpenPathInner::Ongoing {
49 opened: WatchStream::from_changes(opened),
50 path_id,
51 conn,
52 })
53 }
54
55 pub(crate) fn ready(path_id: PathId, conn: ConnectionRef) -> Self {
56 Self(OpenPathInner::Ready { path_id, conn })
57 }
58
59 pub(crate) fn rejected(err: PathError) -> Self {
60 Self(OpenPathInner::Rejected { err })
61 }
62
63 pub fn path_id(&self) -> Option<PathId> {
70 match self.0 {
71 OpenPathInner::Ongoing { path_id, .. } => Some(path_id),
72 OpenPathInner::Rejected { .. } => None,
73 OpenPathInner::Ready { path_id, .. } => Some(path_id),
74 }
75 }
76}
77
78impl Future for OpenPath {
79 type Output = Result<Path, PathError>;
80 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
81 match self.get_mut().0 {
82 OpenPathInner::Ongoing {
83 ref mut opened,
84 path_id,
85 ref mut conn,
86 } => match ready!(Pin::new(opened).poll_next(ctx)) {
87 Some(value) => Poll::Ready(value.map(|_| Path {
88 id: path_id,
89 conn: conn.clone(),
90 })),
91 None => {
92 Poll::Ready(Err(PathError::ValidationFailed))
96 }
97 },
98 OpenPathInner::Ready {
99 path_id,
100 ref mut conn,
101 } => Poll::Ready(Ok(Path {
102 id: path_id,
103 conn: conn.clone(),
104 })),
105 OpenPathInner::Rejected { err } => Poll::Ready(Err(err)),
106 }
107 }
108}
109
110#[derive(Debug)]
112pub struct Path {
113 pub(crate) id: PathId,
114 pub(crate) conn: ConnectionRef,
115}
116
117impl Path {
118 pub fn id(&self) -> PathId {
120 self.id
121 }
122
123 pub fn status(&self) -> Result<PathStatus, ClosedPath> {
125 self.conn
126 .state
127 .lock("path status")
128 .inner
129 .path_status(self.id)
130 }
131
132 pub fn set_status(&self, status: PathStatus) -> Result<(), SetPathStatusError> {
134 self.conn
135 .state
136 .lock("set path status")
137 .inner
138 .set_path_status(self.id, status)?;
139 Ok(())
140 }
141
142 pub fn close(&self) -> Result<ClosePath, ClosePathError> {
148 let (on_path_close_send, on_path_close_recv) = oneshot::channel();
149 {
150 let mut state = self.conn.state.lock("close_path");
151 state.inner.close_path(
152 crate::Instant::now(),
153 self.id,
154 TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
155 )?;
156 state.close_path.insert(self.id, on_path_close_send);
157 }
158
159 Ok(ClosePath {
160 closed: on_path_close_recv,
161 })
162 }
163
164 pub fn set_max_idle_timeout(
172 &self,
173 timeout: Option<Duration>,
174 ) -> Result<Option<Duration>, ClosedPath> {
175 let mut state = self.conn.state.lock("path_set_max_idle_timeout");
176 state.inner.set_path_max_idle_timeout(self.id, timeout)
177 }
178
179 pub fn set_keep_alive_interval(
187 &self,
188 interval: Option<Duration>,
189 ) -> Result<Option<Duration>, ClosedPath> {
190 let mut state = self.conn.state.lock("path_set_keep_alive_interval");
191 state.inner.set_path_keep_alive_interval(self.id, interval)
192 }
193
194 pub fn observed_external_addr(&self) -> Result<AddressDiscovery, ClosedPath> {
198 let state = self.conn.state.lock("per_path_observed_address");
199 let path_events = state.path_events.subscribe();
200 let initial_value = state.inner.path_observed_address(self.id)?;
201 Ok(AddressDiscovery::new(
202 self.id,
203 path_events,
204 initial_value,
205 state.runtime.clone(),
206 ))
207 }
208
209 pub fn remote_address(&self) -> Result<SocketAddr, ClosedPath> {
211 let state = self.conn.state.lock("per_path_remote_address");
212 state.inner.path_remote_address(self.id)
213 }
214}
215
216pub struct ClosePath {
218 closed: oneshot::Receiver<VarInt>,
219}
220
221impl Future for ClosePath {
222 type Output = Result<VarInt, ConnectionError>;
223 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
224 let res = ready!(Pin::new(&mut self.closed).poll(ctx));
226 match res {
227 Ok(code) => Poll::Ready(Ok(code)),
228 Err(_err) => todo!(), }
230 }
231}
232
233pub struct AddressDiscovery {
239 watcher: WatchStream<SocketAddr>,
240}
241
242impl AddressDiscovery {
243 pub(super) fn new(
244 path_id: PathId,
245 mut path_events: tokio::sync::broadcast::Receiver<PathEvent>,
246 initial_value: Option<SocketAddr>,
247 runtime: Arc<dyn Runtime>,
248 ) -> Self {
249 let (tx, rx) = watch::channel(initial_value.unwrap_or_else(||
250 SocketAddr::new([0, 0, 0, 0].into(), 0)));
252 let filter = async move {
253 loop {
254 match path_events.recv().await {
255 Ok(PathEvent::ObservedAddr { id, addr: observed }) if id == path_id => {
256 tx.send_if_modified(|addr| {
257 let old = std::mem::replace(addr, observed);
258 old != *addr
259 });
260 }
261 Ok(PathEvent::Abandoned { id, .. }) if id == path_id => {
262 break;
264 }
265 Ok(_) => {
266 }
268 Err(_) => {
269 break;
273 }
274 }
275 }
276 };
277
278 let watcher = if initial_value.is_some() {
279 WatchStream::new(rx)
280 } else {
281 WatchStream::from_changes(rx)
282 };
283
284 runtime.spawn(Box::pin(filter));
285 Self { watcher }
288 }
289}
290
291impl Stream for AddressDiscovery {
292 type Item = SocketAddr;
293
294 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
295 Pin::new(&mut self.watcher).poll_next(cx)
296 }
297}