1use std::future::Future;
2use std::net::{IpAddr, SocketAddr};
3use std::pin::Pin;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::task::{Context, Poll, ready};
7use std::time::Duration;
8
9use proto::{
10 ClosePathError, ClosedPath, FourTuple, PathError, PathEvent, PathId, PathStats, PathStatus,
11 SetPathStatusError, TransportErrorCode,
12};
13use tokio::sync::watch;
14use tokio_stream::{Stream, wrappers::WatchStream};
15
16use crate::connection::ConnectionRef;
17use crate::{Runtime, WeakConnectionHandle};
18
19pub struct OpenPath(OpenPathInner);
21
22enum OpenPathInner {
23 Ongoing {
27 opened: WatchStream<Result<(), PathError>>,
28 path_id: PathId,
29 conn: ConnectionRef,
30 },
31 Rejected {
33 err: PathError,
35 },
36 Ready {
38 path_id: PathId,
39 conn: ConnectionRef,
40 },
41}
42
43impl OpenPath {
44 pub(crate) fn new(
45 path_id: PathId,
46 opened: watch::Receiver<Result<(), PathError>>,
47 conn: ConnectionRef,
48 ) -> Self {
49 Self(OpenPathInner::Ongoing {
50 opened: WatchStream::from_changes(opened),
51 path_id,
52 conn,
53 })
54 }
55
56 pub(crate) fn ready(path_id: PathId, conn: ConnectionRef) -> Self {
57 Self(OpenPathInner::Ready { path_id, conn })
58 }
59
60 pub(crate) fn rejected(err: PathError) -> Self {
61 Self(OpenPathInner::Rejected { err })
62 }
63
64 pub fn path_id(&self) -> Option<PathId> {
71 match self.0 {
72 OpenPathInner::Ongoing { path_id, .. } => Some(path_id),
73 OpenPathInner::Rejected { .. } => None,
74 OpenPathInner::Ready { path_id, .. } => Some(path_id),
75 }
76 }
77}
78
79impl Future for OpenPath {
80 type Output = Result<Path, PathError>;
81 fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
82 match self.get_mut().0 {
83 OpenPathInner::Ongoing {
84 ref mut opened,
85 path_id,
86 ref mut conn,
87 } => match ready!(Pin::new(opened).poll_next(ctx)) {
88 Some(value) => {
89 Poll::Ready(value.map(|_| Path::new_unchecked(conn.clone(), path_id)))
90 }
91 None => {
92 Poll::Ready(Err(PathError::ValidationFailed))
96 }
97 },
98 OpenPathInner::Ready {
99 path_id,
100 ref mut conn,
101 } => Poll::Ready(Ok(Path::new_unchecked(conn.clone(), path_id))),
102 OpenPathInner::Rejected { err } => Poll::Ready(Err(err)),
103 }
104 }
105}
106
107#[derive(Debug, Clone)]
114pub struct Path {
115 conn: ConnectionRef,
116 path_ref: PathRef,
117}
118
119impl Path {
120 pub(crate) fn new(conn: &ConnectionRef, id: PathId) -> Option<Self> {
122 let path_ref = {
123 let mut state = conn.lock_without_waking("Path::new");
124 state.inner.path_status(id).ok()?;
126 state.acquire_path_ref(id)
127 };
128 Some(Self {
129 conn: conn.clone(),
130 path_ref,
131 })
132 }
133
134 fn new_unchecked(conn: ConnectionRef, id: PathId) -> Self {
136 let path_ref = conn
137 .lock_without_waking("Path::new_unchecked")
138 .acquire_path_ref(id);
139 Self { conn, path_ref }
140 }
141
142 pub fn weak_handle(&self) -> WeakPathHandle {
148 WeakPathHandle {
149 path_ref: self.path_ref.clone(),
150 conn: self.conn.weak_handle(),
151 }
152 }
153
154 pub fn id(&self) -> PathId {
156 self.path_ref.id
157 }
158
159 pub fn status(&self) -> Result<PathStatus, ClosedPath> {
161 self.conn
162 .lock_without_waking("path status")
163 .inner
164 .path_status(self.id())
165 }
166
167 pub fn set_status(&self, status: PathStatus) -> Result<PathStatus, SetPathStatusError> {
171 self.conn
172 .lock_and_wake("set path status")
173 .inner
174 .set_path_status(self.id(), status)
175 }
176
177 pub fn stats(&self) -> PathStats {
179 self.conn
188 .lock_without_waking("Path::stats")
189 .path_stats(self.id())
190 .expect("either path stats or discarded path stats are always set as long as Path is not dropped")
191 }
192
193 pub fn close(&self) -> Result<(), ClosePathError> {
199 let mut state = self.conn.lock_and_wake("close_path");
200 state.inner.close_path(
201 crate::Instant::now(),
202 self.id(),
203 TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
204 )
205 }
206
207 pub fn set_max_idle_timeout(
215 &self,
216 timeout: Option<Duration>,
217 ) -> Result<Option<Duration>, ClosedPath> {
218 let mut state = self.conn.lock_and_wake("path_set_max_idle_timeout");
219 let now = state.runtime.now();
220 state
221 .inner
222 .set_path_max_idle_timeout(now, self.id(), timeout)
223 }
224
225 pub fn set_keep_alive_interval(
233 &self,
234 interval: Option<Duration>,
235 ) -> Result<Option<Duration>, ClosedPath> {
236 let mut state = self.conn.lock_and_wake("path_set_keep_alive_interval");
237 state
238 .inner
239 .set_path_keep_alive_interval(self.id(), interval)
240 }
241
242 pub fn observed_external_addr(&self) -> Result<AddressDiscovery, ClosedPath> {
246 let state = self.conn.lock_without_waking("per_path_observed_address");
247 let path_events = state.path_events.subscribe();
248 let initial_value = state.inner.path_observed_address(self.id())?;
249 Ok(AddressDiscovery::new(
250 self.id(),
251 path_events,
252 initial_value,
253 state.runtime.clone(),
254 ))
255 }
256
257 pub fn remote_address(&self) -> Result<SocketAddr, ClosedPath> {
259 let state = self.conn.lock_without_waking("per_path_remote_address");
260 Ok(state.inner.network_path(self.id())?.remote())
261 }
262
263 pub fn local_ip(&self) -> Result<Option<IpAddr>, ClosedPath> {
268 let state = self.conn.lock_without_waking("per_path_local_ip");
269 Ok(state.inner.network_path(self.id())?.local_ip())
270 }
271
272 pub fn network_path(&self) -> Result<FourTuple, ClosedPath> {
276 let state = self.conn.lock_without_waking("per_path_local_ip");
277 state.inner.network_path(self.id())
278 }
279
280 pub fn ping(&self) -> Result<(), ClosedPath> {
282 let mut state = self.conn.lock_and_wake("ping");
283 state.inner.ping_path(self.id())
284 }
285}
286
287impl Drop for Path {
288 fn drop(&mut self) {
289 self.path_ref.on_drop(&self.conn);
290 }
291}
292
293impl PartialEq for Path {
294 fn eq(&self, other: &Self) -> bool {
295 self.id() == other.id() && self.conn.stable_id() == other.conn.stable_id()
296 }
297}
298
299#[derive(Debug, Clone)]
308pub struct WeakPathHandle {
309 conn: WeakConnectionHandle,
310 path_ref: PathRef,
311}
312
313impl PartialEq for WeakPathHandle {
314 fn eq(&self, other: &Self) -> bool {
315 self.id() == other.id() && self.conn.is_same_connection(&other.conn)
316 }
317}
318
319impl Eq for WeakPathHandle {}
320
321impl WeakPathHandle {
322 pub fn id(&self) -> PathId {
324 self.path_ref.id
325 }
326
327 pub fn upgrade(&self) -> Option<Path> {
331 let conn = self.conn.upgrade_to_ref()?;
332 Some(Path {
333 conn,
334 path_ref: self.path_ref.clone(),
335 })
336 }
337}
338
339impl Drop for WeakPathHandle {
340 fn drop(&mut self) {
341 if let Some(conn) = self.conn.upgrade_to_ref() {
342 self.path_ref.on_drop(&conn);
343 }
344 }
345}
346
347#[derive(Debug, Default)]
354pub(crate) struct PathRefOwner {
355 ref_count: Arc<AtomicUsize>,
356}
357
358impl PathRefOwner {
359 pub(crate) fn acquire(&self, path_id: PathId) -> PathRef {
361 self.ref_count.fetch_add(1, Ordering::Relaxed);
362 PathRef {
363 id: path_id,
364 ref_count: self.ref_count.clone(),
365 }
366 }
367}
368
369#[derive(Debug)]
377pub(crate) struct PathRef {
378 id: PathId,
379 ref_count: Arc<AtomicUsize>,
380}
381
382impl Clone for PathRef {
383 fn clone(&self) -> Self {
384 self.ref_count.fetch_add(1, Ordering::Relaxed);
385 Self {
386 id: self.id,
387 ref_count: self.ref_count.clone(),
388 }
389 }
390}
391
392impl PathRef {
393 fn on_drop(&self, conn: &ConnectionRef) {
398 if self.ref_count.fetch_sub(1, Ordering::Relaxed) > 1 {
399 return;
400 }
401 let mut state = conn.lock_without_waking("PathRef::drop");
402 if self.ref_count.load(Ordering::Relaxed) > 0 {
405 return;
406 }
407 state.path_refs.remove(&self.id);
408 state.final_path_stats.remove(&self.id);
409 }
410}
411
412pub struct AddressDiscovery {
418 watcher: WatchStream<SocketAddr>,
419}
420
421impl AddressDiscovery {
422 pub(super) fn new(
423 path_id: PathId,
424 mut path_events: tokio::sync::broadcast::Receiver<PathEvent>,
425 initial_value: Option<SocketAddr>,
426 runtime: Arc<dyn Runtime>,
427 ) -> Self {
428 let (tx, rx) = watch::channel(initial_value.unwrap_or_else(||
429 SocketAddr::new([0, 0, 0, 0].into(), 0)));
431 let filter = async move {
432 loop {
433 match path_events.recv().await {
434 Ok(PathEvent::ObservedAddr {
435 id, addr: observed, ..
436 }) if id == path_id => {
437 tx.send_if_modified(|addr| {
438 let old = std::mem::replace(addr, observed);
439 old != *addr
440 });
441 }
442 Ok(PathEvent::Discarded { id, .. }) if id == path_id => {
443 break;
445 }
446 Ok(_) => {
447 }
449 Err(_) => {
450 break;
454 }
455 }
456 }
457 };
458
459 let watcher = if initial_value.is_some() {
460 WatchStream::new(rx)
461 } else {
462 WatchStream::from_changes(rx)
463 };
464
465 runtime.spawn(Box::pin(filter));
466 Self { watcher }
469 }
470}
471
472impl Stream for AddressDiscovery {
473 type Item = SocketAddr;
474
475 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
476 Pin::new(&mut self.watcher).poll_next(cx)
477 }
478}