1use std::{
6 collections::{BTreeSet, HashSet},
7 pin::Pin,
8 task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use iroh_base::NodeId;
13use irpc::{channel::mpsc, rpc_requests, Client};
14use n0_future::{Stream, StreamExt, TryStreamExt};
15use nested_enum_utils::common_fields;
16use serde::{Deserialize, Serialize};
17use snafu::Snafu;
18
19use crate::proto::{DeliveryScope, TopicId};
20
21const TOPIC_EVENTS_DEFAULT_CAP: usize = 2048;
23const TOPIC_COMMANDS_CAP: usize = 64;
25
26#[rpc_requests(message = RpcMessage, rpc_feature = "rpc")]
28#[derive(Debug, Serialize, Deserialize)]
29pub(crate) enum Request {
30 #[rpc(tx=mpsc::Sender<Event>, rx=mpsc::Receiver<Command>)]
31 Join(JoinRequest),
32}
33
34#[derive(Debug, Serialize, Deserialize)]
35pub(crate) struct JoinRequest {
36 pub topic_id: TopicId,
37 pub bootstrap: BTreeSet<NodeId>,
38}
39
40#[allow(missing_docs)]
41#[common_fields({
42 backtrace: Option<snafu::Backtrace>,
43 #[snafu(implicit)]
44 span_trace: n0_snafu::SpanTrace,
45})]
46#[derive(Debug, Snafu)]
47#[non_exhaustive]
48pub enum ApiError {
49 #[snafu(transparent)]
50 Rpc { source: irpc::Error },
51 #[snafu(display("topic closed"))]
53 Closed {},
54}
55
56impl From<irpc::channel::SendError> for ApiError {
57 fn from(value: irpc::channel::SendError) -> Self {
58 irpc::Error::from(value).into()
59 }
60}
61
62impl From<irpc::channel::mpsc::RecvError> for ApiError {
63 fn from(value: irpc::channel::mpsc::RecvError) -> Self {
64 irpc::Error::from(value).into()
65 }
66}
67
68impl From<irpc::channel::oneshot::RecvError> for ApiError {
69 fn from(value: irpc::channel::oneshot::RecvError) -> Self {
70 irpc::Error::from(value).into()
71 }
72}
73
74#[derive(Debug, Clone)]
88pub struct GossipApi {
89 client: Client<Request>,
90}
91
92impl GossipApi {
93 #[cfg(feature = "net")]
94 pub(crate) fn local(tx: tokio::sync::mpsc::Sender<RpcMessage>) -> Self {
95 let local = irpc::LocalSender::<Request>::from(tx);
96 Self {
97 client: local.into(),
98 }
99 }
100
101 #[cfg(feature = "rpc")]
103 pub fn connect(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Self {
104 let inner = irpc::Client::quinn(endpoint, addr);
105 Self { client: inner }
106 }
107
108 #[cfg(all(feature = "rpc", feature = "net"))]
110 pub(crate) async fn listen(&self, endpoint: quinn::Endpoint) {
111 use irpc::rpc::{listen, RemoteService};
112
113 let local = self
114 .client
115 .as_local()
116 .expect("cannot listen on remote client");
117 let handler = Request::remote_handler(local);
118
119 listen::<Request>(endpoint, handler).await
120 }
121
122 pub async fn subscribe_with_opts(
130 &self,
131 topic_id: TopicId,
132 opts: JoinOptions,
133 ) -> Result<GossipTopic, ApiError> {
134 let req = JoinRequest {
135 topic_id,
136 bootstrap: opts.bootstrap,
137 };
138 let (tx, rx) = self
139 .client
140 .bidi_streaming(req, TOPIC_COMMANDS_CAP, opts.subscription_capacity)
141 .await?;
142 Ok(GossipTopic::new(tx, rx))
143 }
144
145 pub async fn subscribe_and_join(
147 &self,
148 topic_id: TopicId,
149 bootstrap: Vec<NodeId>,
150 ) -> Result<GossipTopic, ApiError> {
151 let mut sub = self
152 .subscribe_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap))
153 .await?;
154 sub.joined().await?;
155 Ok(sub)
156 }
157
158 pub async fn subscribe(
164 &self,
165 topic_id: TopicId,
166 bootstrap: Vec<NodeId>,
167 ) -> Result<GossipTopic, ApiError> {
168 let sub = self
169 .subscribe_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap))
170 .await?;
171
172 Ok(sub)
173 }
174}
175
176#[derive(Debug, Clone)]
178pub struct GossipSender(mpsc::Sender<Command>);
179
180impl GossipSender {
181 pub(crate) fn new(sender: mpsc::Sender<Command>) -> Self {
182 Self(sender)
183 }
184
185 pub async fn broadcast(&self, message: Bytes) -> Result<(), ApiError> {
187 self.send(Command::Broadcast(message)).await?;
188 Ok(())
189 }
190
191 pub async fn broadcast_neighbors(&self, message: Bytes) -> Result<(), ApiError> {
193 self.send(Command::BroadcastNeighbors(message)).await?;
194 Ok(())
195 }
196
197 pub async fn join_peers(&self, peers: Vec<NodeId>) -> Result<(), ApiError> {
199 self.send(Command::JoinPeers(peers)).await?;
200 Ok(())
201 }
202
203 async fn send(&self, command: Command) -> Result<(), irpc::channel::SendError> {
204 self.0.send(command).await?;
205 Ok(())
206 }
207}
208
209#[derive(Debug)]
218pub struct GossipTopic {
219 sender: GossipSender,
220 receiver: GossipReceiver,
221}
222
223impl GossipTopic {
224 pub(crate) fn new(sender: mpsc::Sender<Command>, receiver: mpsc::Receiver<Event>) -> Self {
225 let sender = GossipSender::new(sender);
226 Self {
227 sender,
228 receiver: GossipReceiver::new(receiver),
229 }
230 }
231
232 pub fn split(self) -> (GossipSender, GossipReceiver) {
234 (self.sender, self.receiver)
235 }
236
237 pub async fn broadcast(&mut self, message: Bytes) -> Result<(), ApiError> {
239 self.sender.broadcast(message).await
240 }
241
242 pub async fn broadcast_neighbors(&mut self, message: Bytes) -> Result<(), ApiError> {
244 self.sender.broadcast_neighbors(message).await
245 }
246
247 pub async fn joined(&mut self) -> Result<(), ApiError> {
251 self.receiver.joined().await
252 }
253
254 pub fn is_joined(&self) -> bool {
256 self.receiver.is_joined()
257 }
258}
259
260impl Stream for GossipTopic {
261 type Item = Result<Event, ApiError>;
262
263 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
264 Pin::new(&mut self.receiver).poll_next(cx)
265 }
266}
267
268#[derive(derive_more::Debug)]
272pub struct GossipReceiver {
273 #[debug("BoxStream")]
274 stream: Pin<Box<dyn Stream<Item = Result<Event, ApiError>> + Send + Sync + 'static>>,
275 neighbors: HashSet<NodeId>,
276}
277
278impl GossipReceiver {
279 pub(crate) fn new(events_rx: mpsc::Receiver<Event>) -> Self {
280 let stream = events_rx.into_stream().map_err(ApiError::from);
281 let stream = Box::pin(stream);
282 Self {
283 stream,
284 neighbors: Default::default(),
285 }
286 }
287
288 pub fn neighbors(&self) -> impl Iterator<Item = NodeId> + '_ {
290 self.neighbors.iter().copied()
291 }
292
293 pub async fn joined(&mut self) -> Result<(), ApiError> {
301 while !self.is_joined() {
302 let _event = self.next().await.ok_or(ClosedSnafu.build())??;
303 }
304 Ok(())
305 }
306
307 pub fn is_joined(&self) -> bool {
309 !self.neighbors.is_empty()
310 }
311}
312
313impl Stream for GossipReceiver {
314 type Item = Result<Event, ApiError>;
315
316 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
317 let item = std::task::ready!(Pin::new(&mut self.stream).poll_next(cx));
318 if let Some(Ok(item)) = &item {
319 match item {
320 Event::NeighborUp(node_id) => {
321 self.neighbors.insert(*node_id);
322 }
323 Event::NeighborDown(node_id) => {
324 self.neighbors.remove(node_id);
325 }
326 _ => {}
327 }
328 }
329 Poll::Ready(item)
330 }
331}
332
333#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
337pub enum Event {
338 NeighborUp(NodeId),
340 NeighborDown(NodeId),
342 Received(Message),
344 Lagged,
346}
347
348impl From<crate::proto::Event<NodeId>> for Event {
349 fn from(event: crate::proto::Event<NodeId>) -> Self {
350 match event {
351 crate::proto::Event::NeighborUp(node_id) => Self::NeighborUp(node_id),
352 crate::proto::Event::NeighborDown(node_id) => Self::NeighborDown(node_id),
353 crate::proto::Event::Received(message) => Self::Received(Message {
354 content: message.content,
355 scope: message.scope,
356 delivered_from: message.delivered_from,
357 }),
358 }
359 }
360}
361
362#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, derive_more::Debug, Serialize, Deserialize)]
364pub struct Message {
365 #[debug("Bytes({})", self.content.len())]
367 pub content: Bytes,
368 pub scope: DeliveryScope,
371 pub delivered_from: NodeId,
373}
374
375#[derive(Serialize, Deserialize, derive_more::Debug, Clone, strum::Display)]
377pub enum Command {
378 Broadcast(#[debug("Bytes({})", _0.len())] Bytes),
380 BroadcastNeighbors(#[debug("Bytes({})", _0.len())] Bytes),
382 JoinPeers(Vec<NodeId>),
384}
385
386impl From<Command> for crate::proto::Command<NodeId> {
387 fn from(value: Command) -> Self {
388 match value {
389 Command::Broadcast(bytes) => Self::Broadcast(bytes, crate::proto::Scope::Swarm),
390 Command::BroadcastNeighbors(bytes) => {
391 Self::Broadcast(bytes, crate::proto::Scope::Neighbors)
392 }
393 Command::JoinPeers(peers) => Self::Join(peers),
394 }
395 }
396}
397
398#[derive(Serialize, Deserialize, Debug)]
400pub struct JoinOptions {
401 pub bootstrap: BTreeSet<NodeId>,
403 pub subscription_capacity: usize,
411}
412
413impl JoinOptions {
414 pub fn with_bootstrap(nodes: impl IntoIterator<Item = NodeId>) -> Self {
417 Self {
418 bootstrap: nodes.into_iter().collect(),
419 subscription_capacity: TOPIC_EVENTS_DEFAULT_CAP,
420 }
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use crate::api::GossipTopic;
427
428 #[cfg(all(feature = "rpc", feature = "net"))]
429 #[tokio::test]
430 #[tracing_test::traced_test]
431 async fn test_rpc() -> n0_snafu::Result {
432 use iroh::{discovery::static_provider::StaticProvider, protocol::Router, RelayMap};
433 use n0_future::{time::Duration, StreamExt};
434 use n0_snafu::{Error, Result, ResultExt};
435 use rand_chacha::rand_core::SeedableRng;
436
437 use crate::{
438 api::{Event, GossipApi},
439 net::{tests::create_endpoint, Gossip},
440 proto::TopicId,
441 ALPN,
442 };
443
444 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
445 let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
446
447 async fn create_gossip_endpoint(
448 rng: &mut rand_chacha::ChaCha12Rng,
449 relay_map: RelayMap,
450 ) -> Result<(Router, Gossip)> {
451 let endpoint = create_endpoint(rng, relay_map, None).await?;
452 let gossip = Gossip::builder().spawn(endpoint.clone());
453 let router = Router::builder(endpoint)
454 .accept(ALPN, gossip.clone())
455 .spawn();
456 Ok((router, gossip))
457 }
458
459 let topic_id = TopicId::from_bytes([0u8; 32]);
460
461 let (router, gossip) = create_gossip_endpoint(&mut rng, relay_map.clone()).await?;
463
464 let (node2_id, node2_addr, node2_task) = {
466 let (router, gossip) = create_gossip_endpoint(&mut rng, relay_map.clone()).await?;
467 let node_addr = router.endpoint().node_addr();
468 let node_id = router.endpoint().node_id();
469 let task = tokio::task::spawn(async move {
470 let mut topic = gossip.subscribe_and_join(topic_id, vec![]).await?;
471 topic.broadcast(b"hello".to_vec().into()).await?;
472 Result::<_, Error>::Ok(router)
473 });
474 (node_id, node_addr, task)
475 };
476
477 let static_provider = StaticProvider::new();
479 static_provider.add_node_info(node2_addr);
480
481 router.endpoint().discovery().add(static_provider);
482
483 let (rpc_server_endpoint, rpc_server_cert) =
485 irpc::util::make_server_endpoint("127.0.0.1:0".parse().unwrap())
486 .map_err(Error::anyhow)?;
487 let rpc_server_addr = rpc_server_endpoint.local_addr().e()?;
488 let rpc_server_task = tokio::task::spawn(async move {
489 gossip.listen(rpc_server_endpoint).await;
490 });
491
492 let rpc_client_endpoint =
494 irpc::util::make_client_endpoint("127.0.0.1:0".parse().unwrap(), &[&rpc_server_cert])
495 .map_err(Error::anyhow)?;
496 let rpc_client = GossipApi::connect(rpc_client_endpoint, rpc_server_addr);
497
498 let recv = async move {
500 let mut topic = rpc_client
501 .subscribe_and_join(topic_id, vec![node2_id])
502 .await?;
503 while let Some(event) = topic.try_next().await? {
505 match event {
506 Event::Received(message) => {
507 assert_eq!(&message.content[..], b"hello");
508 break;
509 }
510 Event::Lagged => panic!("unexpected lagged event"),
511 _ => {}
512 }
513 }
514 Result::<_, Error>::Ok(())
515 };
516
517 tokio::time::timeout(Duration::from_secs(10), recv)
519 .await
520 .e()??;
521
522 rpc_server_task.abort();
524 router.shutdown().await.e()?;
525 let router2 = node2_task.await.e()??;
526 router2.shutdown().await.e()?;
527 Ok(())
528 }
529
530 #[test]
531 fn ensure_gossip_topic_is_sync() {
532 #[allow(unused)]
533 fn get() -> GossipTopic {
534 unimplemented!()
535 }
536 #[allow(unused)]
537 fn check(_t: impl Sync) {}
538 #[allow(unused)]
539 fn foo() {
540 check(get());
541 }
542 }
543}