iroh_gossip/
api.rs

1//! Public API for using iroh-gossip
2//!
3//! The API is usable both locally and over RPC.
4
5use std::{
6    collections::{BTreeSet, HashSet},
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use iroh_base::NodeId;
13use irpc::{channel::mpsc, rpc_requests, Client};
14use n0_future::{Stream, StreamExt, TryStreamExt};
15use nested_enum_utils::common_fields;
16use serde::{Deserialize, Serialize};
17use snafu::Snafu;
18
19use crate::proto::{DeliveryScope, TopicId};
20
21/// Default channel capacity for topic subscription channels (one per topic)
22const TOPIC_EVENTS_DEFAULT_CAP: usize = 2048;
23/// Channel capacity for topic command send channels.
24const TOPIC_COMMANDS_CAP: usize = 64;
25
26/// Input messages for the gossip actor.
27#[rpc_requests(message = RpcMessage, rpc_feature = "rpc")]
28#[derive(Debug, Serialize, Deserialize)]
29pub(crate) enum Request {
30    #[rpc(tx=mpsc::Sender<Event>, rx=mpsc::Receiver<Command>)]
31    Join(JoinRequest),
32}
33
34#[derive(Debug, Serialize, Deserialize)]
35pub(crate) struct JoinRequest {
36    pub topic_id: TopicId,
37    pub bootstrap: BTreeSet<NodeId>,
38}
39
40#[allow(missing_docs)]
41#[common_fields({
42    backtrace: Option<snafu::Backtrace>,
43    #[snafu(implicit)]
44    span_trace: n0_snafu::SpanTrace,
45})]
46#[derive(Debug, Snafu)]
47#[non_exhaustive]
48pub enum ApiError {
49    #[snafu(transparent)]
50    Rpc { source: irpc::Error },
51    /// The gossip topic was closed.
52    #[snafu(display("topic closed"))]
53    Closed {},
54}
55
56impl From<irpc::channel::SendError> for ApiError {
57    fn from(value: irpc::channel::SendError) -> Self {
58        irpc::Error::from(value).into()
59    }
60}
61
62impl From<irpc::channel::mpsc::RecvError> for ApiError {
63    fn from(value: irpc::channel::mpsc::RecvError) -> Self {
64        irpc::Error::from(value).into()
65    }
66}
67
68impl From<irpc::channel::oneshot::RecvError> for ApiError {
69    fn from(value: irpc::channel::oneshot::RecvError) -> Self {
70        irpc::Error::from(value).into()
71    }
72}
73
74/// API to control a [`Gossip`] instance.
75///
76/// This has methods to subscribe and join gossip topics, which return handles to publish
77/// and receive messages on topics.
78///
79/// [`Gossip`] derefs to [`GossipApi`], so all functions on [`GossipApi`] are directly callable
80/// from [`Gossip`].
81///
82/// Additionally, a [`GossipApi`] can be created by connecting to an RPC server. See [`Gossip::listen`]
83/// and [`GossipApi::connect`] (*requires the `rpc` feature).
84///
85/// [`Gossip`]: crate::net::Gossip
86/// [`Gossip::listen`]: crate::net::Gossip::listen
87#[derive(Debug, Clone)]
88pub struct GossipApi {
89    client: Client<Request>,
90}
91
92impl GossipApi {
93    #[cfg(feature = "net")]
94    pub(crate) fn local(tx: tokio::sync::mpsc::Sender<RpcMessage>) -> Self {
95        let local = irpc::LocalSender::<Request>::from(tx);
96        Self {
97            client: local.into(),
98        }
99    }
100
101    /// Connect to a remote as a RPC client.
102    #[cfg(feature = "rpc")]
103    pub fn connect(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Self {
104        let inner = irpc::Client::quinn(endpoint, addr);
105        Self { client: inner }
106    }
107
108    /// Listen on a quinn endpoint for incoming RPC connections.
109    #[cfg(all(feature = "rpc", feature = "net"))]
110    pub(crate) async fn listen(&self, endpoint: quinn::Endpoint) {
111        use irpc::rpc::{listen, RemoteService};
112
113        let local = self
114            .client
115            .as_local()
116            .expect("cannot listen on remote client");
117        let handler = Request::remote_handler(local);
118
119        listen::<Request>(endpoint, handler).await
120    }
121
122    /// Join a gossip topic with options.
123    ///
124    /// Returns a [`GossipTopic`] instantly. To wait for at least one connection to be established,
125    /// you can await [`GossipTopic::joined`].
126    ///
127    /// Messages will be queued until a first connection is available. If the internal channel becomes full,
128    /// the oldest messages will be dropped from the channel.
129    pub async fn subscribe_with_opts(
130        &self,
131        topic_id: TopicId,
132        opts: JoinOptions,
133    ) -> Result<GossipTopic, ApiError> {
134        let req = JoinRequest {
135            topic_id,
136            bootstrap: opts.bootstrap,
137        };
138        let (tx, rx) = self
139            .client
140            .bidi_streaming(req, TOPIC_COMMANDS_CAP, opts.subscription_capacity)
141            .await?;
142        Ok(GossipTopic::new(tx, rx))
143    }
144
145    /// Join a gossip topic with the default options and wait for at least one active connection.
146    pub async fn subscribe_and_join(
147        &self,
148        topic_id: TopicId,
149        bootstrap: Vec<NodeId>,
150    ) -> Result<GossipTopic, ApiError> {
151        let mut sub = self
152            .subscribe_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap))
153            .await?;
154        sub.joined().await?;
155        Ok(sub)
156    }
157
158    /// Join a gossip topic with the default options.
159    ///
160    /// Note that this will not wait for any bootstrap node to be available.
161    /// To ensure the topic is connected to at least one node, use [`GossipTopic::joined`]
162    /// or [`Self::subscribe_and_join`]
163    pub async fn subscribe(
164        &self,
165        topic_id: TopicId,
166        bootstrap: Vec<NodeId>,
167    ) -> Result<GossipTopic, ApiError> {
168        let sub = self
169            .subscribe_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap))
170            .await?;
171
172        Ok(sub)
173    }
174}
175
176/// Sender for a gossip topic.
177#[derive(Debug, Clone)]
178pub struct GossipSender(mpsc::Sender<Command>);
179
180impl GossipSender {
181    pub(crate) fn new(sender: mpsc::Sender<Command>) -> Self {
182        Self(sender)
183    }
184
185    /// Broadcasts a message to all nodes.
186    pub async fn broadcast(&self, message: Bytes) -> Result<(), ApiError> {
187        self.send(Command::Broadcast(message)).await?;
188        Ok(())
189    }
190
191    /// Broadcasts a message to our direct neighbors.
192    pub async fn broadcast_neighbors(&self, message: Bytes) -> Result<(), ApiError> {
193        self.send(Command::BroadcastNeighbors(message)).await?;
194        Ok(())
195    }
196
197    /// Joins a set of peers.
198    pub async fn join_peers(&self, peers: Vec<NodeId>) -> Result<(), ApiError> {
199        self.send(Command::JoinPeers(peers)).await?;
200        Ok(())
201    }
202
203    async fn send(&self, command: Command) -> Result<(), irpc::channel::SendError> {
204        self.0.send(command).await?;
205        Ok(())
206    }
207}
208
209/// Subscribed gossip topic.
210///
211/// This handle is a [`Stream`] of [`Event`]s from the topic, and can be used to send messages.
212///
213/// Once the [`GossipTopic`] is dropped, the network actor will leave the gossip topic.
214///
215/// It may be split into sender and receiver parts with [`Self::split`]. In this case, the topic will
216/// be left once both the [`GossipSender`] and [`GossipReceiver`] halves are dropped.
217#[derive(Debug)]
218pub struct GossipTopic {
219    sender: GossipSender,
220    receiver: GossipReceiver,
221}
222
223impl GossipTopic {
224    pub(crate) fn new(sender: mpsc::Sender<Command>, receiver: mpsc::Receiver<Event>) -> Self {
225        let sender = GossipSender::new(sender);
226        Self {
227            sender,
228            receiver: GossipReceiver::new(receiver),
229        }
230    }
231
232    /// Splits `self` into [`GossipSender`] and [`GossipReceiver`] parts.
233    pub fn split(self) -> (GossipSender, GossipReceiver) {
234        (self.sender, self.receiver)
235    }
236
237    /// Sends a message to all peers.
238    pub async fn broadcast(&mut self, message: Bytes) -> Result<(), ApiError> {
239        self.sender.broadcast(message).await
240    }
241
242    /// Sends a message to our direct neighbors in the swarm.
243    pub async fn broadcast_neighbors(&mut self, message: Bytes) -> Result<(), ApiError> {
244        self.sender.broadcast_neighbors(message).await
245    }
246
247    /// Waits until we are connected to at least one node.
248    ///
249    /// See [`GossipReceiver::joined`] for details.
250    pub async fn joined(&mut self) -> Result<(), ApiError> {
251        self.receiver.joined().await
252    }
253
254    /// Returns `true` if we are connected to at least one node.
255    pub fn is_joined(&self) -> bool {
256        self.receiver.is_joined()
257    }
258}
259
260impl Stream for GossipTopic {
261    type Item = Result<Event, ApiError>;
262
263    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
264        Pin::new(&mut self.receiver).poll_next(cx)
265    }
266}
267
268/// Receiver for gossip events on a topic.
269///
270/// This is a [`Stream`] of [`Event`]s emitted from the topic.
271#[derive(derive_more::Debug)]
272pub struct GossipReceiver {
273    #[debug("BoxStream")]
274    stream: Pin<Box<dyn Stream<Item = Result<Event, ApiError>> + Send + Sync + 'static>>,
275    neighbors: HashSet<NodeId>,
276}
277
278impl GossipReceiver {
279    pub(crate) fn new(events_rx: mpsc::Receiver<Event>) -> Self {
280        let stream = events_rx.into_stream().map_err(ApiError::from);
281        let stream = Box::pin(stream);
282        Self {
283            stream,
284            neighbors: Default::default(),
285        }
286    }
287
288    /// Lists our current direct neighbors.
289    pub fn neighbors(&self) -> impl Iterator<Item = NodeId> + '_ {
290        self.neighbors.iter().copied()
291    }
292
293    /// Waits until we are connected to at least one node.
294    ///
295    /// Progresses the event stream to the first [`Event::NeighborUp`] event.
296    ///
297    /// Note that this consumes this initial `NeighborUp` event. If you want to track
298    /// neighbors, use [`Self::neighbors`] after awaiting [`Self::joined`], and then
299    /// continue to track `NeighborUp` events on the event stream.
300    pub async fn joined(&mut self) -> Result<(), ApiError> {
301        while !self.is_joined() {
302            let _event = self.next().await.ok_or(ClosedSnafu.build())??;
303        }
304        Ok(())
305    }
306
307    /// Returns `true` if we are connected to at least one node.
308    pub fn is_joined(&self) -> bool {
309        !self.neighbors.is_empty()
310    }
311}
312
313impl Stream for GossipReceiver {
314    type Item = Result<Event, ApiError>;
315
316    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
317        let item = std::task::ready!(Pin::new(&mut self.stream).poll_next(cx));
318        if let Some(Ok(item)) = &item {
319            match item {
320                Event::NeighborUp(node_id) => {
321                    self.neighbors.insert(*node_id);
322                }
323                Event::NeighborDown(node_id) => {
324                    self.neighbors.remove(node_id);
325                }
326                _ => {}
327            }
328        }
329        Poll::Ready(item)
330    }
331}
332
333/// Events emitted from a gossip topic.
334///
335/// These are the events emitted from a [`GossipReceiver`].
336#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
337pub enum Event {
338    /// We have a new, direct neighbor in the swarm membership layer for this topic.
339    NeighborUp(NodeId),
340    /// We dropped direct neighbor in the swarm membership layer for this topic.
341    NeighborDown(NodeId),
342    /// We received a gossip message for this topic.
343    Received(Message),
344    /// We missed some messages because our [`GossipReceiver`] was not progressing fast enough.
345    Lagged,
346}
347
348impl From<crate::proto::Event<NodeId>> for Event {
349    fn from(event: crate::proto::Event<NodeId>) -> Self {
350        match event {
351            crate::proto::Event::NeighborUp(node_id) => Self::NeighborUp(node_id),
352            crate::proto::Event::NeighborDown(node_id) => Self::NeighborDown(node_id),
353            crate::proto::Event::Received(message) => Self::Received(Message {
354                content: message.content,
355                scope: message.scope,
356                delivered_from: message.delivered_from,
357            }),
358        }
359    }
360}
361
362/// A gossip message
363#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, derive_more::Debug, Serialize, Deserialize)]
364pub struct Message {
365    /// The content of the message
366    #[debug("Bytes({})", self.content.len())]
367    pub content: Bytes,
368    /// The scope of the message.
369    /// This tells us if the message is from a direct neighbor or actual gossip.
370    pub scope: DeliveryScope,
371    /// The node that delivered the message. This is not the same as the original author.
372    pub delivered_from: NodeId,
373}
374
375/// Command for a gossip topic.
376#[derive(Serialize, Deserialize, derive_more::Debug, Clone, strum::Display)]
377pub enum Command {
378    /// Broadcasts a message to all nodes in the swarm.
379    Broadcast(#[debug("Bytes({})", _0.len())] Bytes),
380    /// Broadcasts a message to all direct neighbors.
381    BroadcastNeighbors(#[debug("Bytes({})", _0.len())] Bytes),
382    /// Connects to a set of peers.
383    JoinPeers(Vec<NodeId>),
384}
385
386impl From<Command> for crate::proto::Command<NodeId> {
387    fn from(value: Command) -> Self {
388        match value {
389            Command::Broadcast(bytes) => Self::Broadcast(bytes, crate::proto::Scope::Swarm),
390            Command::BroadcastNeighbors(bytes) => {
391                Self::Broadcast(bytes, crate::proto::Scope::Neighbors)
392            }
393            Command::JoinPeers(peers) => Self::Join(peers),
394        }
395    }
396}
397
398/// Options for joining a gossip topic.
399#[derive(Serialize, Deserialize, Debug)]
400pub struct JoinOptions {
401    /// The initial bootstrap nodes.
402    pub bootstrap: BTreeSet<NodeId>,
403    /// The maximum number of messages that can be buffered in a subscription.
404    ///
405    /// If this limit is reached, the subscriber will receive a `Lagged` response,
406    /// the message will be dropped, and the subscriber will be closed.
407    ///
408    /// This is to prevent a single slow subscriber from blocking the dispatch loop.
409    /// If a subscriber is lagging, it should be closed and re-opened.
410    pub subscription_capacity: usize,
411}
412
413impl JoinOptions {
414    /// Creates [`JoinOptions`] with the provided bootstrap nodes and the default subscription
415    /// capacity.
416    pub fn with_bootstrap(nodes: impl IntoIterator<Item = NodeId>) -> Self {
417        Self {
418            bootstrap: nodes.into_iter().collect(),
419            subscription_capacity: TOPIC_EVENTS_DEFAULT_CAP,
420        }
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use crate::api::GossipTopic;
427
428    #[cfg(all(feature = "rpc", feature = "net"))]
429    #[tokio::test]
430    #[tracing_test::traced_test]
431    async fn test_rpc() -> n0_snafu::Result {
432        use iroh::{discovery::static_provider::StaticProvider, protocol::Router, RelayMap};
433        use n0_future::{time::Duration, StreamExt};
434        use n0_snafu::{Error, Result, ResultExt};
435        use rand_chacha::rand_core::SeedableRng;
436
437        use crate::{
438            api::{Event, GossipApi},
439            net::{tests::create_endpoint, Gossip},
440            proto::TopicId,
441            ALPN,
442        };
443
444        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
445        let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
446
447        async fn create_gossip_endpoint(
448            rng: &mut rand_chacha::ChaCha12Rng,
449            relay_map: RelayMap,
450        ) -> Result<(Router, Gossip)> {
451            let endpoint = create_endpoint(rng, relay_map, None).await?;
452            let gossip = Gossip::builder().spawn(endpoint.clone());
453            let router = Router::builder(endpoint)
454                .accept(ALPN, gossip.clone())
455                .spawn();
456            Ok((router, gossip))
457        }
458
459        let topic_id = TopicId::from_bytes([0u8; 32]);
460
461        // create our gossip node
462        let (router, gossip) = create_gossip_endpoint(&mut rng, relay_map.clone()).await?;
463
464        // create a second node so that we can test actually joining
465        let (node2_id, node2_addr, node2_task) = {
466            let (router, gossip) = create_gossip_endpoint(&mut rng, relay_map.clone()).await?;
467            let node_addr = router.endpoint().node_addr();
468            let node_id = router.endpoint().node_id();
469            let task = tokio::task::spawn(async move {
470                let mut topic = gossip.subscribe_and_join(topic_id, vec![]).await?;
471                topic.broadcast(b"hello".to_vec().into()).await?;
472                Result::<_, Error>::Ok(router)
473            });
474            (node_id, node_addr, task)
475        };
476
477        // create static provider to add node addr manually
478        let static_provider = StaticProvider::new();
479        static_provider.add_node_info(node2_addr);
480
481        router.endpoint().discovery().add(static_provider);
482
483        // expose the gossip node over RPC
484        let (rpc_server_endpoint, rpc_server_cert) =
485            irpc::util::make_server_endpoint("127.0.0.1:0".parse().unwrap())
486                .map_err(Error::anyhow)?;
487        let rpc_server_addr = rpc_server_endpoint.local_addr().e()?;
488        let rpc_server_task = tokio::task::spawn(async move {
489            gossip.listen(rpc_server_endpoint).await;
490        });
491
492        // connect to the RPC node with a new client
493        let rpc_client_endpoint =
494            irpc::util::make_client_endpoint("127.0.0.1:0".parse().unwrap(), &[&rpc_server_cert])
495                .map_err(Error::anyhow)?;
496        let rpc_client = GossipApi::connect(rpc_client_endpoint, rpc_server_addr);
497
498        // join via RPC
499        let recv = async move {
500            let mut topic = rpc_client
501                .subscribe_and_join(topic_id, vec![node2_id])
502                .await?;
503            // wait for a message
504            while let Some(event) = topic.try_next().await? {
505                match event {
506                    Event::Received(message) => {
507                        assert_eq!(&message.content[..], b"hello");
508                        break;
509                    }
510                    Event::Lagged => panic!("unexpected lagged event"),
511                    _ => {}
512                }
513            }
514            Result::<_, Error>::Ok(())
515        };
516
517        // timeout to not hang in case of failure
518        tokio::time::timeout(Duration::from_secs(10), recv)
519            .await
520            .e()??;
521
522        // shutdown
523        rpc_server_task.abort();
524        router.shutdown().await.e()?;
525        let router2 = node2_task.await.e()??;
526        router2.shutdown().await.e()?;
527        Ok(())
528    }
529
530    #[test]
531    fn ensure_gossip_topic_is_sync() {
532        #[allow(unused)]
533        fn get() -> GossipTopic {
534            unimplemented!()
535        }
536        #[allow(unused)]
537        fn check(_t: impl Sync) {}
538        #[allow(unused)]
539        fn foo() {
540            check(get());
541        }
542    }
543}