1use std::fmt;
4use std::net::SocketAddr;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use proto::PathEvent;
9use proto::n0_nat_traversal;
10use thiserror::Error;
11use tokio::sync::{broadcast, watch};
12use tokio_stream::Stream;
13use tokio_stream::wrappers::{BroadcastStream, WatchStream, errors::BroadcastStreamRecvError};
14
15#[derive(Debug, Clone, PartialEq, Eq, Error)]
20#[error("channel lagged by {0}")]
21pub struct Lagged(pub u64);
22
23#[derive(Debug)]
25pub struct PathEvents {
26 inner: BroadcastStream<PathEvent>,
27}
28
29impl PathEvents {
30 pub(crate) fn new(rx: broadcast::Receiver<PathEvent>) -> Self {
31 Self {
32 inner: BroadcastStream::new(rx),
33 }
34 }
35}
36
37impl Stream for PathEvents {
38 type Item = Result<PathEvent, Lagged>;
39
40 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
41 Pin::new(&mut self.inner)
42 .poll_next(cx)
43 .map(|opt| opt.map(|res| res.map_err(|BroadcastStreamRecvError::Lagged(n)| Lagged(n))))
44 }
45}
46
47#[derive(Debug)]
49pub struct NatTraversalUpdates {
50 inner: BroadcastStream<n0_nat_traversal::Event>,
51}
52
53impl NatTraversalUpdates {
54 pub(crate) fn new(rx: broadcast::Receiver<n0_nat_traversal::Event>) -> Self {
55 Self {
56 inner: BroadcastStream::new(rx),
57 }
58 }
59}
60
61impl Stream for NatTraversalUpdates {
62 type Item = Result<n0_nat_traversal::Event, Lagged>;
63
64 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65 Pin::new(&mut self.inner)
66 .poll_next(cx)
67 .map(|opt| opt.map(|res| res.map_err(|BroadcastStreamRecvError::Lagged(n)| Lagged(n))))
68 }
69}
70
71pub struct ObservedExternalAddr {
76 rx: watch::Receiver<Option<SocketAddr>>,
77 stream: WatchStream<Option<SocketAddr>>,
78}
79
80impl ObservedExternalAddr {
81 pub(crate) fn new(rx: watch::Receiver<Option<SocketAddr>>) -> Self {
82 let stream = WatchStream::new(rx.clone());
83 Self { rx, stream }
84 }
85
86 pub fn get(&self) -> Option<SocketAddr> {
91 *self.rx.borrow()
92 }
93}
94
95impl Stream for ObservedExternalAddr {
96 type Item = SocketAddr;
97
98 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
99 loop {
100 match Pin::new(&mut self.stream).poll_next(cx) {
101 Poll::Ready(Some(Some(addr))) => return Poll::Ready(Some(addr)),
102 Poll::Ready(Some(None)) => continue,
103 Poll::Ready(None) => return Poll::Ready(None),
104 Poll::Pending => return Poll::Pending,
105 }
106 }
107 }
108}
109
110impl fmt::Debug for ObservedExternalAddr {
111 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112 f.debug_struct("ObservedExternalAddr").finish()
113 }
114}