noq/
event_stream.rs

1//! Newtype wrappers around broadcast receivers for connection events.
2
3use std::fmt;
4use std::net::SocketAddr;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use proto::PathEvent;
9use proto::n0_nat_traversal;
10use thiserror::Error;
11use tokio::sync::{broadcast, watch};
12use tokio_stream::Stream;
13use tokio_stream::wrappers::{BroadcastStream, WatchStream, errors::BroadcastStreamRecvError};
14
15/// The receiver lagged too far behind.
16///
17/// Attempting to receive again will return the oldest message still retained
18/// by the channel.
19#[derive(Debug, Clone, PartialEq, Eq, Error)]
20#[error("channel lagged by {0}")]
21pub struct Lagged(pub u64);
22
23/// A stream of [`PathEvent`]s for all paths in a connection.
24#[derive(Debug)]
25pub struct PathEvents {
26    inner: BroadcastStream<PathEvent>,
27}
28
29impl PathEvents {
30    pub(crate) fn new(rx: broadcast::Receiver<PathEvent>) -> Self {
31        Self {
32            inner: BroadcastStream::new(rx),
33        }
34    }
35}
36
37impl Stream for PathEvents {
38    type Item = Result<PathEvent, Lagged>;
39
40    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
41        Pin::new(&mut self.inner)
42            .poll_next(cx)
43            .map(|opt| opt.map(|res| res.map_err(|BroadcastStreamRecvError::Lagged(n)| Lagged(n))))
44    }
45}
46
47/// A stream of NAT traversal updates for a connection.
48#[derive(Debug)]
49pub struct NatTraversalUpdates {
50    inner: BroadcastStream<n0_nat_traversal::Event>,
51}
52
53impl NatTraversalUpdates {
54    pub(crate) fn new(rx: broadcast::Receiver<n0_nat_traversal::Event>) -> Self {
55        Self {
56            inner: BroadcastStream::new(rx),
57        }
58    }
59}
60
61impl Stream for NatTraversalUpdates {
62    type Item = Result<n0_nat_traversal::Event, Lagged>;
63
64    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65        Pin::new(&mut self.inner)
66            .poll_next(cx)
67            .map(|opt| opt.map(|res| res.map_err(|BroadcastStreamRecvError::Lagged(n)| Lagged(n))))
68    }
69}
70
71/// Watches the external address reported by the peer for this connection.
72///
73/// Implements [`Stream`] yielding `SocketAddr` on each change.
74/// Use [`get`](Self::get) to read the current value without waiting.
75pub struct ObservedExternalAddr {
76    rx: watch::Receiver<Option<SocketAddr>>,
77    stream: WatchStream<Option<SocketAddr>>,
78}
79
80impl ObservedExternalAddr {
81    pub(crate) fn new(rx: watch::Receiver<Option<SocketAddr>>) -> Self {
82        let stream = WatchStream::new(rx.clone());
83        Self { rx, stream }
84    }
85
86    /// Returns the most recently observed external address.
87    ///
88    /// `None` is returned if the peer has not yet reported an address. Retains
89    /// the last value even after the stream is closed.
90    pub fn get(&self) -> Option<SocketAddr> {
91        *self.rx.borrow()
92    }
93}
94
95impl Stream for ObservedExternalAddr {
96    type Item = SocketAddr;
97
98    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
99        loop {
100            match Pin::new(&mut self.stream).poll_next(cx) {
101                Poll::Ready(Some(Some(addr))) => return Poll::Ready(Some(addr)),
102                Poll::Ready(Some(None)) => continue,
103                Poll::Ready(None) => return Poll::Ready(None),
104                Poll::Pending => return Poll::Pending,
105            }
106        }
107    }
108}
109
110impl fmt::Debug for ObservedExternalAddr {
111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112        f.debug_struct("ObservedExternalAddr").finish()
113    }
114}