iroh_gossip/
net.rs

1//! Networking for the `iroh-gossip` protocol
2
3use std::{
4    collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
5    net::SocketAddr,
6    pin::Pin,
7    sync::Arc,
8    task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_concurrency::stream::{stream_group, StreamGroup};
13use futures_util::FutureExt as _;
14use iroh::{
15    endpoint::Connection,
16    protocol::{AcceptError, ProtocolHandler},
17    Endpoint, NodeAddr, NodeId, PublicKey, RelayUrl, Watcher,
18};
19use irpc::WithChannels;
20use n0_future::{
21    task::{self, AbortOnDropHandle, JoinSet},
22    time::Instant,
23    Stream, StreamExt as _,
24};
25use nested_enum_utils::common_fields;
26use rand::{rngs::StdRng, SeedableRng};
27use serde::{Deserialize, Serialize};
28use snafu::Snafu;
29use tokio::sync::{broadcast, mpsc, oneshot};
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, error_span, trace, warn, Instrument};
32
33use self::{
34    discovery::GossipDiscovery,
35    util::{RecvLoop, SendLoop, Timers},
36};
37use crate::{
38    api::{self, Command, Event, GossipApi, RpcMessage},
39    metrics::Metrics,
40    proto::{self, HyparviewConfig, PeerData, PlumtreeConfig, Scope, TopicId},
41};
42
43mod discovery;
44mod util;
45
46/// ALPN protocol name
47pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/1";
48
49/// Channel capacity for the send queue (one per connection)
50const SEND_QUEUE_CAP: usize = 64;
51/// Channel capacity for the ToActor message queue (single)
52const TO_ACTOR_CAP: usize = 64;
53/// Channel capacity for the InEvent message queue (single)
54const IN_EVENT_CAP: usize = 1024;
55/// Channel capacity for broadcast subscriber event queue (one per topic)
56const TOPIC_EVENT_CAP: usize = 256;
57
58/// Events emitted from the gossip protocol
59pub type ProtoEvent = proto::Event<PublicKey>;
60/// Commands for the gossip protocol
61pub type ProtoCommand = proto::Command<PublicKey>;
62
63type InEvent = proto::InEvent<PublicKey>;
64type OutEvent = proto::OutEvent<PublicKey>;
65type Timer = proto::Timer<PublicKey>;
66type ProtoMessage = proto::Message<PublicKey>;
67
68/// Publish and subscribe on gossiping topics.
69///
70/// Each topic is a separate broadcast tree with separate memberships.
71///
72/// A topic has to be joined before you can publish or subscribe on the topic.
73/// To join the swarm for a topic, you have to know the [`PublicKey`] of at least one peer that also joined the topic.
74///
75/// Messages published on the swarm will be delivered to all peers that joined the swarm for that
76/// topic. You will also be relaying (gossiping) messages published by other peers.
77///
78/// With the default settings, the protocol will maintain up to 5 peer connections per topic.
79///
80/// Even though the [`Gossip`] is created from a [`Endpoint`], it does not accept connections
81/// itself. You should run an accept loop on the [`Endpoint`] yourself, check the ALPN protocol of incoming
82/// connections, and if the ALPN protocol equals [`GOSSIP_ALPN`], forward the connection to the
83/// gossip actor through [Self::handle_connection].
84///
85/// The gossip actor will, however, initiate new connections to other peers by itself.
86#[derive(Debug, Clone)]
87pub struct Gossip {
88    pub(crate) inner: Arc<Inner>,
89}
90
91impl std::ops::Deref for Gossip {
92    type Target = GossipApi;
93    fn deref(&self) -> &Self::Target {
94        &self.inner.api
95    }
96}
97
98#[derive(Debug)]
99enum LocalActorMessage {
100    HandleConnection(Connection),
101    Shutdown { reply: oneshot::Sender<()> },
102}
103
104#[allow(missing_docs)]
105#[common_fields({
106    backtrace: Option<snafu::Backtrace>,
107    #[snafu(implicit)]
108    span_trace: n0_snafu::SpanTrace,
109})]
110#[derive(Debug, Snafu)]
111#[non_exhaustive]
112pub enum Error {
113    ActorDropped {},
114}
115
116impl<T> From<mpsc::error::SendError<T>> for Error {
117    fn from(_value: mpsc::error::SendError<T>) -> Self {
118        ActorDroppedSnafu.build()
119    }
120}
121impl From<oneshot::error::RecvError> for Error {
122    fn from(_value: oneshot::error::RecvError) -> Self {
123        ActorDroppedSnafu.build()
124    }
125}
126
127#[derive(Debug)]
128pub(crate) struct Inner {
129    api: GossipApi,
130    local_tx: mpsc::Sender<LocalActorMessage>,
131    _actor_handle: AbortOnDropHandle<()>,
132    max_message_size: usize,
133    metrics: Arc<Metrics>,
134}
135
136impl ProtocolHandler for Gossip {
137    async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
138        self.handle_connection(connection)
139            .await
140            .map_err(AcceptError::from_err)?;
141        Ok(())
142    }
143
144    async fn shutdown(&self) {
145        if let Err(err) = self.shutdown().await {
146            warn!("error while shutting down gossip: {err:#}");
147        }
148    }
149}
150
151/// Builder to configure and construct [`Gossip`].
152#[derive(Debug, Clone)]
153pub struct Builder {
154    config: proto::Config,
155    alpn: Option<Bytes>,
156}
157
158impl Builder {
159    /// Sets the maximum message size in bytes.
160    /// By default this is `4096` bytes.
161    pub fn max_message_size(mut self, size: usize) -> Self {
162        self.config.max_message_size = size;
163        self
164    }
165
166    /// Set the membership configuration.
167    pub fn membership_config(mut self, config: HyparviewConfig) -> Self {
168        self.config.membership = config;
169        self
170    }
171
172    /// Set the broadcast configuration.
173    pub fn broadcast_config(mut self, config: PlumtreeConfig) -> Self {
174        self.config.broadcast = config;
175        self
176    }
177
178    /// Set the ALPN this gossip instance uses.
179    ///
180    /// It has to be the same for all peers in the network. If you set a custom ALPN,
181    /// you have to use the same ALPN when registering the [`Gossip`] in on a iroh
182    /// router with [`RouterBuilder::accept`].
183    ///
184    /// [`RouterBuilder::accept`]: iroh::protocol::RouterBuilder::accept
185    pub fn alpn(mut self, alpn: impl AsRef<[u8]>) -> Self {
186        self.alpn = Some(alpn.as_ref().to_vec().into());
187        self
188    }
189
190    /// Spawn a gossip actor and get a handle for it
191    pub fn spawn(self, endpoint: Endpoint) -> Gossip {
192        let metrics = Arc::new(Metrics::default());
193        let discovery = GossipDiscovery::default();
194        endpoint.discovery().add(discovery.clone());
195        let (actor, rpc_tx, local_tx) =
196            Actor::new(endpoint, self.config, metrics.clone(), self.alpn, discovery);
197        let me = actor.endpoint.node_id().fmt_short();
198        let max_message_size = actor.state.max_message_size();
199
200        let actor_handle = task::spawn(actor.run().instrument(error_span!("gossip", %me)));
201
202        let api = GossipApi::local(rpc_tx);
203
204        Gossip {
205            inner: Inner {
206                api,
207                local_tx,
208                _actor_handle: AbortOnDropHandle::new(actor_handle),
209                max_message_size,
210                metrics,
211            }
212            .into(),
213        }
214    }
215}
216
217impl Gossip {
218    /// Creates a default `Builder`, with the endpoint set.
219    pub fn builder() -> Builder {
220        Builder {
221            config: Default::default(),
222            alpn: None,
223        }
224    }
225
226    /// Listen on a quinn endpoint for incoming RPC connections.
227    #[cfg(feature = "rpc")]
228    pub async fn listen(self, endpoint: quinn::Endpoint) {
229        self.inner.api.listen(endpoint).await
230    }
231
232    /// Get the maximum message size configured for this gossip actor.
233    pub fn max_message_size(&self) -> usize {
234        self.inner.max_message_size
235    }
236
237    /// Handle an incoming [`Connection`].
238    ///
239    /// Make sure to check the ALPN protocol yourself before passing the connection.
240    pub async fn handle_connection(&self, conn: Connection) -> Result<(), Error> {
241        self.inner
242            .local_tx
243            .send(LocalActorMessage::HandleConnection(conn))
244            .await?;
245        Ok(())
246    }
247
248    /// Shutdown the gossip instance.
249    ///
250    /// This leaves all topics, sending `Disconnect` messages to peers, and then
251    /// stops the gossip actor loop and drops all state and connections.
252    pub async fn shutdown(&self) -> Result<(), Error> {
253        let (reply, reply_rx) = oneshot::channel();
254        self.inner
255            .local_tx
256            .send(LocalActorMessage::Shutdown { reply })
257            .await?;
258        reply_rx.await?;
259        Ok(())
260    }
261
262    /// Returns the metrics tracked for this gossip instance.
263    pub fn metrics(&self) -> &Arc<Metrics> {
264        &self.inner.metrics
265    }
266}
267
268/// Actor that sends and handles messages between the connection and main state loops
269struct Actor {
270    alpn: Bytes,
271    /// Protocol state
272    state: proto::State<PublicKey, StdRng>,
273    /// The endpoint through which we dial peers
274    endpoint: Endpoint,
275    /// Dial machine to connect to peers
276    dialer: Dialer,
277    /// Input messages to the actor
278    rpc_rx: mpsc::Receiver<RpcMessage>,
279    local_rx: mpsc::Receiver<LocalActorMessage>,
280    /// Sender for the state input (cloned into the connection loops)
281    in_event_tx: mpsc::Sender<InEvent>,
282    /// Input events to the state (emitted from the connection loops)
283    in_event_rx: mpsc::Receiver<InEvent>,
284    /// Queued timers
285    timers: Timers<Timer>,
286    /// Map of topics to their state.
287    topics: HashMap<TopicId, TopicState>,
288    /// Map of peers to their state.
289    peers: HashMap<NodeId, PeerState>,
290    /// Stream of commands from topic handles.
291    command_rx: stream_group::Keyed<TopicCommandStream>,
292    /// Internal queue of topic to close because all handles were dropped.
293    quit_queue: VecDeque<TopicId>,
294    /// Tasks for the connection loops, to keep track of panics.
295    connection_tasks: JoinSet<(NodeId, Connection, Result<(), ConnectionLoopError>)>,
296    metrics: Arc<Metrics>,
297    topic_event_forwarders: JoinSet<TopicId>,
298    discovery: GossipDiscovery,
299}
300
301impl Actor {
302    fn new(
303        endpoint: Endpoint,
304        config: proto::Config,
305        metrics: Arc<Metrics>,
306        alpn: Option<Bytes>,
307        discovery: GossipDiscovery,
308    ) -> (
309        Self,
310        mpsc::Sender<RpcMessage>,
311        mpsc::Sender<LocalActorMessage>,
312    ) {
313        let peer_id = endpoint.node_id();
314        let dialer = Dialer::new(endpoint.clone());
315        let state = proto::State::new(
316            peer_id,
317            Default::default(),
318            config,
319            rand::rngs::StdRng::from_rng(&mut rand::rng()),
320        );
321        let (rpc_tx, rpc_rx) = mpsc::channel(TO_ACTOR_CAP);
322        let (local_tx, local_rx) = mpsc::channel(16);
323        let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
324
325        let actor = Actor {
326            alpn: alpn.unwrap_or_else(|| GOSSIP_ALPN.to_vec().into()),
327            endpoint,
328            state,
329            dialer,
330            rpc_rx,
331            in_event_rx,
332            in_event_tx,
333            timers: Timers::new(),
334            command_rx: StreamGroup::new().keyed(),
335            peers: Default::default(),
336            topics: Default::default(),
337            quit_queue: Default::default(),
338            connection_tasks: Default::default(),
339            metrics,
340            local_rx,
341            topic_event_forwarders: Default::default(),
342            discovery,
343        };
344
345        (actor, rpc_tx, local_tx)
346    }
347
348    pub async fn run(mut self) {
349        let mut addr_update_stream = self.setup().await;
350
351        let mut i = 0;
352        while self.event_loop(&mut addr_update_stream, i).await {
353            i += 1;
354        }
355    }
356
357    /// Performs the initial actor setup to run the [`Actor::event_loop`].
358    ///
359    /// This updates our current address and return it. It also returns the home relay stream and
360    /// direct addr stream.
361    async fn setup(&mut self) -> impl Stream<Item = NodeAddr> + Send + Unpin + use<> {
362        let addr_update_stream = self.endpoint.watch_node_addr().stream();
363        let initial_addr = self.endpoint.node_addr();
364        self.handle_addr_update(initial_addr).await;
365        addr_update_stream
366    }
367
368    /// One event loop processing step.
369    ///
370    /// None is returned when no further processing should be performed.
371    async fn event_loop(
372        &mut self,
373        addr_updates: &mut (impl Stream<Item = NodeAddr> + Send + Unpin),
374        i: usize,
375    ) -> bool {
376        self.metrics.actor_tick_main.inc();
377        tokio::select! {
378            biased;
379            conn = self.local_rx.recv() => {
380                match conn {
381                    Some(LocalActorMessage::Shutdown { reply }) => {
382                        debug!("received shutdown message, quit all topics");
383                        self.quit_queue.extend(self.topics.keys().copied());
384                        self.process_quit_queue().await;
385                        debug!("all topics quit, stop gossip actor");
386                        reply.send(()).ok();
387                        return false;
388                    },
389                    Some(LocalActorMessage::HandleConnection(conn)) => {
390                        if let Ok(remote_node_id) = conn.remote_node_id() {
391                            self.handle_connection(remote_node_id, ConnOrigin::Accept, conn);
392                        }
393                    }
394                    None => {
395                        debug!("all gossip handles dropped, stop gossip actor");
396                        return false;
397                    }
398                }
399            }
400            msg = self.rpc_rx.recv() => {
401                trace!(?i, "tick: to_actor_rx");
402                self.metrics.actor_tick_rx.inc();
403                match msg {
404                    Some(msg) => {
405                        self.handle_rpc_msg(msg, Instant::now()).await;
406                    }
407                    None => {
408                        debug!("all gossip handles dropped, stop gossip actor");
409                        return false;
410                    }
411                }
412            },
413            Some((key, (topic, command))) = self.command_rx.next(), if !self.command_rx.is_empty() => {
414                trace!(?i, "tick: command_rx");
415                self.handle_command(topic, key, command).await;
416            },
417            Some(new_address) = addr_updates.next() => {
418                trace!(?i, "tick: new_address");
419                self.metrics.actor_tick_endpoint.inc();
420                self.handle_addr_update(new_address).await;
421            }
422            (peer_id, res) = self.dialer.next_conn() => {
423                trace!(?i, "tick: dialer");
424                self.metrics.actor_tick_dialer.inc();
425                match res {
426                    Some(Ok(conn)) => {
427                        debug!(peer = %peer_id.fmt_short(), "dial successful");
428                        self.metrics.actor_tick_dialer_success.inc();
429                        self.handle_connection(peer_id, ConnOrigin::Dial, conn);
430                    }
431                    Some(Err(err)) => {
432                        warn!(peer = %peer_id.fmt_short(), "dial failed: {err}");
433                        self.metrics.actor_tick_dialer_failure.inc();
434                        let peer_state = self.peers.get(&peer_id);
435                        let is_active = matches!(peer_state, Some(PeerState::Active { .. }));
436                        if !is_active {
437                            self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
438                                .await;
439                        }
440                    }
441                    None => {
442                        warn!(peer = %peer_id.fmt_short(), "dial disconnected");
443                        self.metrics.actor_tick_dialer_failure.inc();
444                    }
445                }
446            }
447            event = self.in_event_rx.recv() => {
448                trace!(?i, "tick: in_event_rx");
449                self.metrics.actor_tick_in_event_rx.inc();
450                let event = event.expect("unreachable: in_event_tx is never dropped before receiver");
451                self.handle_in_event(event, Instant::now()).await;
452            }
453            _ = self.timers.wait_next() => {
454                trace!(?i, "tick: timers");
455                self.metrics.actor_tick_timers.inc();
456                let now = Instant::now();
457                while let Some((_instant, timer)) = self.timers.pop_before(now) {
458                    self.handle_in_event(InEvent::TimerExpired(timer), now).await;
459                }
460            }
461            Some(res) = self.connection_tasks.join_next(), if !self.connection_tasks.is_empty() => {
462                trace!(?i, "tick: connection_tasks");
463                let (peer_id, conn, result) = res.expect("connection task panicked");
464                self.handle_connection_task_finished(peer_id, conn, result).await;
465            }
466            Some(res) = self.topic_event_forwarders.join_next(), if !self.topic_event_forwarders.is_empty() => {
467                let topic_id = res.expect("topic event forwarder panicked");
468                if let Some(state) = self.topics.get_mut(&topic_id) {
469                    if !state.still_needed() {
470                        self.quit_queue.push_back(topic_id);
471                        self.process_quit_queue().await;
472                    }
473                }
474            }
475        }
476
477        true
478    }
479
480    async fn handle_addr_update(&mut self, node_addr: NodeAddr) {
481        // let peer_data = our_peer_data(&self.endpoint, current_addresses);
482        let peer_data = encode_peer_data(&node_addr.into());
483        self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now())
484            .await
485    }
486
487    async fn handle_command(
488        &mut self,
489        topic: TopicId,
490        key: stream_group::Key,
491        command: Option<Command>,
492    ) {
493        debug!(?topic, ?key, ?command, "handle command");
494        let Some(state) = self.topics.get_mut(&topic) else {
495            // TODO: unreachable?
496            warn!("received command for unknown topic");
497            return;
498        };
499        match command {
500            Some(command) => {
501                let command = match command {
502                    Command::Broadcast(message) => ProtoCommand::Broadcast(message, Scope::Swarm),
503                    Command::BroadcastNeighbors(message) => {
504                        ProtoCommand::Broadcast(message, Scope::Neighbors)
505                    }
506                    Command::JoinPeers(peers) => ProtoCommand::Join(peers),
507                };
508                self.handle_in_event(proto::InEvent::Command(topic, command), Instant::now())
509                    .await;
510            }
511            None => {
512                state.command_rx_keys.remove(&key);
513                if !state.still_needed() {
514                    self.quit_queue.push_back(topic);
515                    self.process_quit_queue().await;
516                }
517            }
518        }
519    }
520
521    fn handle_connection(&mut self, peer_id: NodeId, origin: ConnOrigin, conn: Connection) {
522        let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
523        let conn_id = conn.stable_id();
524
525        let queue = match self.peers.entry(peer_id) {
526            Entry::Occupied(mut entry) => entry.get_mut().accept_conn(send_tx, conn_id),
527            Entry::Vacant(entry) => {
528                entry.insert(PeerState::Active {
529                    active_send_tx: send_tx,
530                    active_conn_id: conn_id,
531                    other_conns: Vec::new(),
532                });
533                Vec::new()
534            }
535        };
536
537        let max_message_size = self.state.max_message_size();
538        let in_event_tx = self.in_event_tx.clone();
539
540        // Spawn a task for this connection
541        self.connection_tasks.spawn(
542            async move {
543                let res = connection_loop(
544                    peer_id,
545                    conn.clone(),
546                    origin,
547                    send_rx,
548                    in_event_tx,
549                    max_message_size,
550                    queue,
551                )
552                .await;
553                (peer_id, conn, res)
554            }
555            .instrument(error_span!("conn", peer = %peer_id.fmt_short())),
556        );
557    }
558
559    #[tracing::instrument(name = "conn", skip_all, fields(peer = %peer_id.fmt_short()))]
560    async fn handle_connection_task_finished(
561        &mut self,
562        peer_id: NodeId,
563        conn: Connection,
564        task_result: Result<(), ConnectionLoopError>,
565    ) {
566        if conn.close_reason().is_none() {
567            conn.close(0u32.into(), b"close from disconnect");
568        }
569        let reason = conn.close_reason().expect("just closed");
570        let error = task_result.err();
571        debug!(%reason, ?error, "connection closed");
572        if let Some(PeerState::Active {
573            active_conn_id,
574            other_conns,
575            ..
576        }) = self.peers.get_mut(&peer_id)
577        {
578            if conn.stable_id() == *active_conn_id {
579                debug!("active send connection closed, mark peer as disconnected");
580                self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
581                    .await;
582            } else {
583                other_conns.retain(|x| *x != conn.stable_id());
584                debug!("remaining {} other connections", other_conns.len() + 1);
585            }
586        } else {
587            debug!("peer already marked as disconnected");
588        }
589    }
590
591    async fn handle_rpc_msg(&mut self, msg: RpcMessage, now: Instant) {
592        trace!("handle to_actor  {msg:?}");
593        match msg {
594            RpcMessage::Join(msg) => {
595                let WithChannels {
596                    inner,
597                    rx,
598                    tx,
599                    // TODO(frando): make use of span?
600                    span: _,
601                } = msg;
602                let api::JoinRequest {
603                    topic_id,
604                    bootstrap,
605                } = inner;
606                let TopicState {
607                    neighbors,
608                    event_sender,
609                    command_rx_keys,
610                } = self.topics.entry(topic_id).or_default();
611                let mut sender_dead = false;
612                if !neighbors.is_empty() {
613                    for neighbor in neighbors.iter() {
614                        if let Err(_err) = tx.try_send(Event::NeighborUp(*neighbor)).await {
615                            sender_dead = true;
616                            break;
617                        }
618                    }
619                }
620
621                if !sender_dead {
622                    let fut =
623                        topic_subscriber_loop(tx, event_sender.subscribe()).map(move |_| topic_id);
624                    self.topic_event_forwarders
625                        .spawn(fut.instrument(tracing::Span::current()));
626                }
627                let command_rx = TopicCommandStream::new(topic_id, Box::pin(rx.into_stream()));
628                let key = self.command_rx.insert(command_rx);
629                command_rx_keys.insert(key);
630
631                self.handle_in_event(
632                    InEvent::Command(
633                        topic_id,
634                        ProtoCommand::Join(bootstrap.into_iter().collect()),
635                    ),
636                    now,
637                )
638                .await;
639            }
640        }
641    }
642
643    async fn handle_in_event(&mut self, event: InEvent, now: Instant) {
644        self.handle_in_event_inner(event, now).await;
645        self.process_quit_queue().await;
646    }
647
648    async fn process_quit_queue(&mut self) {
649        while let Some(topic_id) = self.quit_queue.pop_front() {
650            self.handle_in_event_inner(
651                InEvent::Command(topic_id, ProtoCommand::Quit),
652                Instant::now(),
653            )
654            .await;
655            if self.topics.remove(&topic_id).is_some() {
656                tracing::debug!(%topic_id, "publishers and subscribers gone; unsubscribing");
657            }
658        }
659    }
660
661    async fn handle_in_event_inner(&mut self, event: InEvent, now: Instant) {
662        if matches!(event, InEvent::TimerExpired(_)) {
663            trace!(?event, "handle in_event");
664        } else {
665            debug!(?event, "handle in_event");
666        };
667        let out = self.state.handle(event, now, Some(&self.metrics));
668        for event in out {
669            if matches!(event, OutEvent::ScheduleTimer(_, _)) {
670                trace!(?event, "handle out_event");
671            } else {
672                debug!(?event, "handle out_event");
673            };
674            match event {
675                OutEvent::SendMessage(peer_id, message) => {
676                    let state = self.peers.entry(peer_id).or_default();
677                    match state {
678                        PeerState::Active { active_send_tx, .. } => {
679                            if let Err(_err) = active_send_tx.send(message).await {
680                                // Removing the peer is handled by the in_event PeerDisconnected sent
681                                // in [`Self::handle_connection_task_finished`].
682                                warn!(
683                                    peer = %peer_id.fmt_short(),
684                                    "failed to send: connection task send loop terminated",
685                                );
686                            }
687                        }
688                        PeerState::Pending { queue } => {
689                            if queue.is_empty() {
690                                debug!(peer = %peer_id.fmt_short(), "start to dial");
691                                self.dialer.queue_dial(peer_id, self.alpn.clone());
692                            }
693                            queue.push(message);
694                        }
695                    }
696                }
697                OutEvent::EmitEvent(topic_id, event) => {
698                    let Some(state) = self.topics.get_mut(&topic_id) else {
699                        // TODO: unreachable?
700                        warn!(?topic_id, "gossip state emitted event for unknown topic");
701                        continue;
702                    };
703                    let TopicState {
704                        neighbors,
705                        event_sender,
706                        ..
707                    } = state;
708                    match &event {
709                        ProtoEvent::NeighborUp(neighbor) => {
710                            neighbors.insert(*neighbor);
711                        }
712                        ProtoEvent::NeighborDown(neighbor) => {
713                            neighbors.remove(neighbor);
714                        }
715                        _ => {}
716                    }
717                    event_sender.send(event).ok();
718                    if !state.still_needed() {
719                        self.quit_queue.push_back(topic_id);
720                    }
721                }
722                OutEvent::ScheduleTimer(delay, timer) => {
723                    self.timers.insert(now + delay, timer);
724                }
725                OutEvent::DisconnectPeer(peer_id) => {
726                    // signal disconnection by dropping the senders to the connection
727                    debug!(peer=%peer_id.fmt_short(), "gossip state indicates disconnect: drop peer");
728                    self.peers.remove(&peer_id);
729                }
730                OutEvent::PeerData(node_id, data) => match decode_peer_data(&data) {
731                    Err(err) => warn!("Failed to decode {data:?} from {node_id}: {err}"),
732                    Ok(info) => {
733                        debug!(peer = ?node_id, "add known addrs: {info:?}");
734                        let node_addr = NodeAddr {
735                            node_id,
736                            relay_url: info.relay_url,
737                            direct_addresses: info.direct_addresses,
738                        };
739                        self.discovery.add(node_addr);
740                    }
741                },
742            }
743        }
744    }
745}
746
747type ConnId = usize;
748
749#[derive(Debug)]
750enum PeerState {
751    Pending {
752        queue: Vec<ProtoMessage>,
753    },
754    Active {
755        active_send_tx: mpsc::Sender<ProtoMessage>,
756        active_conn_id: ConnId,
757        other_conns: Vec<ConnId>,
758    },
759}
760
761impl PeerState {
762    fn accept_conn(
763        &mut self,
764        send_tx: mpsc::Sender<ProtoMessage>,
765        conn_id: ConnId,
766    ) -> Vec<ProtoMessage> {
767        match self {
768            PeerState::Pending { queue } => {
769                let queue = std::mem::take(queue);
770                *self = PeerState::Active {
771                    active_send_tx: send_tx,
772                    active_conn_id: conn_id,
773                    other_conns: Vec::new(),
774                };
775                queue
776            }
777            PeerState::Active {
778                active_send_tx,
779                active_conn_id,
780                other_conns,
781            } => {
782                // We already have an active connection. We keep the old connection intact,
783                // but only use the new connection for sending from now on.
784                // By dropping the `send_tx` of the old connection, the send loop part of
785                // the `connection_loop` of the old connection will terminate, which will also
786                // notify the peer that the old connection may be dropped.
787                other_conns.push(*active_conn_id);
788                *active_send_tx = send_tx;
789                *active_conn_id = conn_id;
790                Vec::new()
791            }
792        }
793    }
794}
795
796impl Default for PeerState {
797    fn default() -> Self {
798        PeerState::Pending { queue: Vec::new() }
799    }
800}
801
802#[derive(Debug)]
803struct TopicState {
804    neighbors: BTreeSet<NodeId>,
805    event_sender: broadcast::Sender<ProtoEvent>,
806    /// Keys identifying command receivers in [`Actor::command_rx`].
807    ///
808    /// This represents the receiver side of gossip's publish public API.
809    command_rx_keys: HashSet<stream_group::Key>,
810}
811
812impl Default for TopicState {
813    fn default() -> Self {
814        let (event_sender, _) = broadcast::channel(TOPIC_EVENT_CAP);
815        Self {
816            neighbors: Default::default(),
817            command_rx_keys: Default::default(),
818            event_sender,
819        }
820    }
821}
822
823impl TopicState {
824    /// Check if the topic still has any publisher or subscriber.
825    fn still_needed(&self) -> bool {
826        !self.command_rx_keys.is_empty() && self.event_sender.receiver_count() > 0
827    }
828
829    #[cfg(test)]
830    fn joined(&self) -> bool {
831        !self.neighbors.is_empty()
832    }
833}
834
835/// Whether a connection is initiated by us (Dial) or by the remote peer (Accept)
836#[derive(Debug, Clone, Copy, PartialEq, Eq)]
837enum ConnOrigin {
838    Accept,
839    Dial,
840}
841
842#[allow(missing_docs)]
843#[common_fields({
844    backtrace: Option<snafu::Backtrace>,
845})]
846#[derive(Debug, Snafu)]
847#[snafu(module)]
848#[non_exhaustive]
849enum ConnectionLoopError {
850    #[snafu(transparent)]
851    Write {
852        source: self::util::WriteError,
853    },
854    #[snafu(transparent)]
855    Read {
856        source: self::util::ReadError,
857    },
858    #[snafu(transparent)]
859    Connection {
860        source: iroh::endpoint::ConnectionError,
861    },
862    ActorDropped {},
863}
864
865impl<T> From<mpsc::error::SendError<T>> for ConnectionLoopError {
866    fn from(_value: mpsc::error::SendError<T>) -> Self {
867        self::connection_loop_error::ActorDroppedSnafu.build()
868    }
869}
870
871async fn connection_loop(
872    from: PublicKey,
873    conn: Connection,
874    origin: ConnOrigin,
875    send_rx: mpsc::Receiver<ProtoMessage>,
876    in_event_tx: mpsc::Sender<InEvent>,
877    max_message_size: usize,
878    queue: Vec<ProtoMessage>,
879) -> Result<(), ConnectionLoopError> {
880    debug!(?origin, "connection established");
881
882    let mut send_loop = SendLoop::new(conn.clone(), send_rx, max_message_size);
883    let mut recv_loop = RecvLoop::new(from, conn, in_event_tx, max_message_size);
884
885    let send_fut = send_loop.run(queue).instrument(error_span!("send"));
886    let recv_fut = recv_loop.run().instrument(error_span!("recv"));
887
888    let (send_res, recv_res) = tokio::join!(send_fut, recv_fut);
889    send_res?;
890    recv_res?;
891    Ok(())
892}
893
894#[derive(Default, Debug, Clone, Serialize, Deserialize)]
895struct AddrInfo {
896    relay_url: Option<RelayUrl>,
897    direct_addresses: BTreeSet<SocketAddr>,
898}
899
900impl From<NodeAddr> for AddrInfo {
901    fn from(
902        NodeAddr {
903            relay_url,
904            direct_addresses,
905            ..
906        }: NodeAddr,
907    ) -> Self {
908        Self {
909            relay_url,
910            direct_addresses,
911        }
912    }
913}
914
915fn encode_peer_data(info: &AddrInfo) -> PeerData {
916    let bytes = postcard::to_stdvec(info).expect("serializing AddrInfo may not fail");
917    PeerData::new(bytes)
918}
919
920fn decode_peer_data(peer_data: &PeerData) -> Result<AddrInfo, postcard::Error> {
921    let bytes = peer_data.as_bytes();
922    if bytes.is_empty() {
923        return Ok(AddrInfo::default());
924    }
925    let info = postcard::from_bytes(bytes)?;
926    Ok(info)
927}
928
929async fn topic_subscriber_loop(
930    sender: irpc::channel::mpsc::Sender<Event>,
931    mut topic_events: broadcast::Receiver<ProtoEvent>,
932) {
933    loop {
934        tokio::select! {
935           biased;
936           msg = topic_events.recv() => {
937               let event = match msg {
938                   Err(broadcast::error::RecvError::Closed) => break,
939                   Err(broadcast::error::RecvError::Lagged(_)) => Event::Lagged,
940                   Ok(event) => event.into(),
941               };
942               if sender.send(event).await.is_err() {
943                   break;
944               }
945           }
946           _ = sender.closed() => break,
947        }
948    }
949}
950
951/// A stream of commands for a gossip subscription.
952type BoxedCommandReceiver =
953    n0_future::stream::Boxed<Result<Command, irpc::channel::mpsc::RecvError>>;
954
955#[derive(derive_more::Debug)]
956struct TopicCommandStream {
957    topic_id: TopicId,
958    #[debug("CommandStream")]
959    stream: BoxedCommandReceiver,
960    closed: bool,
961}
962
963impl TopicCommandStream {
964    fn new(topic_id: TopicId, stream: BoxedCommandReceiver) -> Self {
965        Self {
966            topic_id,
967            stream,
968            closed: false,
969        }
970    }
971}
972
973impl Stream for TopicCommandStream {
974    type Item = (TopicId, Option<Command>);
975    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
976        if self.closed {
977            return Poll::Ready(None);
978        }
979        match Pin::new(&mut self.stream).poll_next(cx) {
980            Poll::Ready(Some(Ok(item))) => Poll::Ready(Some((self.topic_id, Some(item)))),
981            Poll::Ready(None) | Poll::Ready(Some(Err(_))) => {
982                self.closed = true;
983                Poll::Ready(Some((self.topic_id, None)))
984            }
985            Poll::Pending => Poll::Pending,
986        }
987    }
988}
989
990#[derive(Debug)]
991struct Dialer {
992    endpoint: Endpoint,
993    pending: JoinSet<(
994        NodeId,
995        Option<Result<Connection, iroh::endpoint::ConnectError>>,
996    )>,
997    pending_dials: HashMap<NodeId, CancellationToken>,
998}
999
1000impl Dialer {
1001    /// Create a new dialer for a [`Endpoint`]
1002    fn new(endpoint: Endpoint) -> Self {
1003        Self {
1004            endpoint,
1005            pending: Default::default(),
1006            pending_dials: Default::default(),
1007        }
1008    }
1009
1010    /// Starts to dial a node by [`NodeId`].
1011    fn queue_dial(&mut self, node_id: NodeId, alpn: Bytes) {
1012        if self.is_pending(node_id) {
1013            return;
1014        }
1015        let cancel = CancellationToken::new();
1016        self.pending_dials.insert(node_id, cancel.clone());
1017        let endpoint = self.endpoint.clone();
1018        self.pending.spawn(
1019            async move {
1020                let res = tokio::select! {
1021                    biased;
1022                    _ = cancel.cancelled() => None,
1023                    res = endpoint.connect(node_id, &alpn) => Some(res),
1024                };
1025                (node_id, res)
1026            }
1027            .instrument(tracing::Span::current()),
1028        );
1029    }
1030
1031    /// Checks if a node is currently being dialed.
1032    fn is_pending(&self, node: NodeId) -> bool {
1033        self.pending_dials.contains_key(&node)
1034    }
1035
1036    /// Waits for the next dial operation to complete.
1037    /// `None` means disconnected
1038    async fn next_conn(
1039        &mut self,
1040    ) -> (
1041        NodeId,
1042        Option<Result<Connection, iroh::endpoint::ConnectError>>,
1043    ) {
1044        match self.pending_dials.is_empty() {
1045            false => {
1046                let (node_id, res) = loop {
1047                    match self.pending.join_next().await {
1048                        Some(Ok((node_id, res))) => {
1049                            self.pending_dials.remove(&node_id);
1050                            break (node_id, res);
1051                        }
1052                        Some(Err(e)) => {
1053                            error!("next conn error: {:?}", e);
1054                        }
1055                        None => {
1056                            error!("no more pending conns available");
1057                            std::future::pending().await
1058                        }
1059                    }
1060                };
1061
1062                (node_id, res)
1063            }
1064            true => std::future::pending().await,
1065        }
1066    }
1067}
1068
1069#[cfg(test)]
1070pub(crate) mod test {
1071    use std::time::Duration;
1072
1073    use bytes::Bytes;
1074    use futures_concurrency::future::TryJoin;
1075    use iroh::{
1076        discovery::static_provider::StaticProvider, endpoint::BindError, protocol::Router,
1077        RelayMap, RelayMode, SecretKey,
1078    };
1079    use n0_snafu::{Result, ResultExt};
1080    use rand::{CryptoRng, Rng};
1081    use tokio::{spawn, time::timeout};
1082    use tokio_util::sync::CancellationToken;
1083    use tracing::{info, instrument};
1084    use tracing_test::traced_test;
1085
1086    use super::*;
1087    use crate::api::{ApiError, GossipReceiver, GossipSender};
1088
1089    struct ManualActorLoop {
1090        actor: Actor,
1091        step: usize,
1092    }
1093
1094    impl std::ops::Deref for ManualActorLoop {
1095        type Target = Actor;
1096
1097        fn deref(&self) -> &Self::Target {
1098            &self.actor
1099        }
1100    }
1101
1102    impl std::ops::DerefMut for ManualActorLoop {
1103        fn deref_mut(&mut self) -> &mut Self::Target {
1104            &mut self.actor
1105        }
1106    }
1107
1108    type EndpointHandle = tokio::task::JoinHandle<Result<()>>;
1109
1110    impl ManualActorLoop {
1111        #[instrument(skip_all, fields(me = %actor.endpoint.node_id().fmt_short()))]
1112        async fn new(mut actor: Actor) -> Self {
1113            let _ = actor.setup().await;
1114            Self { actor, step: 0 }
1115        }
1116
1117        #[instrument(skip_all, fields(me = %self.endpoint.node_id().fmt_short()))]
1118        async fn step(&mut self) -> bool {
1119            let ManualActorLoop { actor, step } = self;
1120            *step += 1;
1121            // ignore updates that change our published address. This gives us better control over
1122            // events since the endpoint it no longer emitting changes
1123            let addr_update_stream = &mut futures_lite::stream::pending();
1124            actor.event_loop(addr_update_stream, *step).await
1125        }
1126
1127        async fn steps(&mut self, n: usize) {
1128            for _ in 0..n {
1129                self.step().await;
1130            }
1131        }
1132
1133        async fn finish(mut self) {
1134            while self.step().await {}
1135        }
1136    }
1137
1138    impl Gossip {
1139        /// Creates a testing gossip instance and its actor without spawning it.
1140        ///
1141        /// This creates the endpoint and spawns the endpoint loop as well. The handle for the
1142        /// endpoing task is returned along the gossip instance and actor. Since the actor is not
1143        /// actually spawned as [`Builder::spawn`] would, the gossip instance will have a
1144        /// handle to a dummy task instead.
1145        async fn t_new_with_actor(
1146            rng: &mut rand_chacha::ChaCha12Rng,
1147            config: proto::Config,
1148            relay_map: RelayMap,
1149            cancel: &CancellationToken,
1150        ) -> Result<(Self, Actor, EndpointHandle), BindError> {
1151            let endpoint = create_endpoint(rng, relay_map, None).await?;
1152            let metrics = Arc::new(Metrics::default());
1153            let discovery = GossipDiscovery::default();
1154            endpoint.discovery().add(discovery.clone());
1155
1156            let (actor, to_actor_tx, conn_tx) =
1157                Actor::new(endpoint, config, metrics.clone(), None, discovery);
1158            let max_message_size = actor.state.max_message_size();
1159
1160            let _actor_handle =
1161                AbortOnDropHandle::new(task::spawn(futures_lite::future::pending()));
1162            let gossip = Self {
1163                inner: Inner {
1164                    api: GossipApi::local(to_actor_tx),
1165                    local_tx: conn_tx,
1166                    _actor_handle,
1167                    max_message_size,
1168                    metrics,
1169                }
1170                .into(),
1171            };
1172
1173            let endpoint_task = task::spawn(endpoint_loop(
1174                actor.endpoint.clone(),
1175                gossip.clone(),
1176                cancel.child_token(),
1177            ));
1178
1179            Ok((gossip, actor, endpoint_task))
1180        }
1181
1182        /// Crates a new testing gossip instance with the normal actor loop.
1183        async fn t_new(
1184            rng: &mut rand_chacha::ChaCha12Rng,
1185            config: proto::Config,
1186            relay_map: RelayMap,
1187            cancel: &CancellationToken,
1188        ) -> Result<(Self, Endpoint, EndpointHandle, impl Drop + use<>), BindError> {
1189            let (g, actor, ep_handle) =
1190                Gossip::t_new_with_actor(rng, config, relay_map, cancel).await?;
1191            let ep = actor.endpoint.clone();
1192            let me = ep.node_id().fmt_short();
1193            let actor_handle =
1194                task::spawn(actor.run().instrument(tracing::error_span!("gossip", %me)));
1195            Ok((g, ep, ep_handle, AbortOnDropHandle::new(actor_handle)))
1196        }
1197    }
1198
1199    pub(crate) async fn create_endpoint(
1200        rng: &mut rand_chacha::ChaCha12Rng,
1201        relay_map: RelayMap,
1202        static_provider: Option<StaticProvider>,
1203    ) -> Result<Endpoint, BindError> {
1204        let ep = Endpoint::builder()
1205            .secret_key(SecretKey::generate(rng))
1206            .alpns(vec![GOSSIP_ALPN.to_vec()])
1207            .relay_mode(RelayMode::Custom(relay_map))
1208            .insecure_skip_relay_cert_verify(true)
1209            .bind()
1210            .await?;
1211
1212        if let Some(static_provider) = static_provider {
1213            ep.discovery().add(static_provider);
1214        }
1215        ep.online().await;
1216        Ok(ep)
1217    }
1218
1219    async fn endpoint_loop(
1220        endpoint: Endpoint,
1221        gossip: Gossip,
1222        cancel: CancellationToken,
1223    ) -> Result<()> {
1224        loop {
1225            tokio::select! {
1226                biased;
1227                _ = cancel.cancelled() => break,
1228                incoming = endpoint.accept() => match incoming {
1229                    None => break,
1230                    Some(incoming) => {
1231                        let connecting = match incoming.accept() {
1232                            Ok(connecting) => connecting,
1233                            Err(err) => {
1234                                warn!("incoming connection failed: {err:#}");
1235                                // we can carry on in these cases:
1236                                // this can be caused by retransmitted datagrams
1237                                continue;
1238                            }
1239                        };
1240                        gossip.handle_connection(connecting.await.e()?).await?
1241                    }
1242                }
1243            }
1244        }
1245        Ok(())
1246    }
1247
1248    #[tokio::test]
1249    #[traced_test]
1250    async fn gossip_net_smoke() {
1251        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1252        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1253
1254        let static_provider = StaticProvider::new();
1255
1256        let ep1 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1257            .await
1258            .unwrap();
1259        let ep2 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1260            .await
1261            .unwrap();
1262        let ep3 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1263            .await
1264            .unwrap();
1265
1266        let go1 = Gossip::builder().spawn(ep1.clone());
1267        let go2 = Gossip::builder().spawn(ep2.clone());
1268        let go3 = Gossip::builder().spawn(ep3.clone());
1269        debug!("peer1 {:?}", ep1.node_id());
1270        debug!("peer2 {:?}", ep2.node_id());
1271        debug!("peer3 {:?}", ep3.node_id());
1272        let pi1 = ep1.node_id();
1273        let pi2 = ep2.node_id();
1274
1275        let cancel = CancellationToken::new();
1276        let tasks = [
1277            spawn(endpoint_loop(ep1.clone(), go1.clone(), cancel.clone())),
1278            spawn(endpoint_loop(ep2.clone(), go2.clone(), cancel.clone())),
1279            spawn(endpoint_loop(ep3.clone(), go3.clone(), cancel.clone())),
1280        ];
1281
1282        debug!("----- adding peers  ----- ");
1283        let topic: TopicId = blake3::hash(b"foobar").into();
1284
1285        let addr1 = NodeAddr::new(pi1).with_relay_url(relay_url.clone());
1286        let addr2 = NodeAddr::new(pi2).with_relay_url(relay_url);
1287        static_provider.add_node_info(addr1.clone());
1288        static_provider.add_node_info(addr2.clone());
1289
1290        debug!("----- joining  ----- ");
1291        // join the topics and wait for the connection to succeed
1292        let [sub1, mut sub2, mut sub3] = [
1293            go1.subscribe_and_join(topic, vec![]),
1294            go2.subscribe_and_join(topic, vec![pi1]),
1295            go3.subscribe_and_join(topic, vec![pi2]),
1296        ]
1297        .try_join()
1298        .await
1299        .unwrap();
1300
1301        let (sink1, _stream1) = sub1.split();
1302
1303        let len = 2;
1304
1305        // publish messages on node1
1306        let pub1 = spawn(async move {
1307            for i in 0..len {
1308                let message = format!("hi{i}");
1309                info!("go1 broadcast: {message:?}");
1310                sink1.broadcast(message.into_bytes().into()).await.unwrap();
1311                tokio::time::sleep(Duration::from_micros(1)).await;
1312            }
1313        });
1314
1315        // wait for messages on node2
1316        let sub2 = spawn(async move {
1317            let mut recv = vec![];
1318            loop {
1319                let ev = sub2.next().await.unwrap().unwrap();
1320                info!("go2 event: {ev:?}");
1321                if let Event::Received(msg) = ev {
1322                    recv.push(msg.content);
1323                }
1324                if recv.len() == len {
1325                    return recv;
1326                }
1327            }
1328        });
1329
1330        // wait for messages on node3
1331        let sub3 = spawn(async move {
1332            let mut recv = vec![];
1333            loop {
1334                let ev = sub3.next().await.unwrap().unwrap();
1335                info!("go3 event: {ev:?}");
1336                if let Event::Received(msg) = ev {
1337                    recv.push(msg.content);
1338                }
1339                if recv.len() == len {
1340                    return recv;
1341                }
1342            }
1343        });
1344
1345        timeout(Duration::from_secs(10), pub1)
1346            .await
1347            .unwrap()
1348            .unwrap();
1349        let recv2 = timeout(Duration::from_secs(10), sub2)
1350            .await
1351            .unwrap()
1352            .unwrap();
1353        let recv3 = timeout(Duration::from_secs(10), sub3)
1354            .await
1355            .unwrap()
1356            .unwrap();
1357
1358        let expected: Vec<Bytes> = (0..len)
1359            .map(|i| Bytes::from(format!("hi{i}").into_bytes()))
1360            .collect();
1361        assert_eq!(recv2, expected);
1362        assert_eq!(recv3, expected);
1363
1364        cancel.cancel();
1365        for t in tasks {
1366            timeout(Duration::from_secs(10), t)
1367                .await
1368                .unwrap()
1369                .unwrap()
1370                .unwrap();
1371        }
1372    }
1373
1374    /// Test that when a gossip topic is no longer needed it's actually unsubscribed.
1375    ///
1376    /// This test will:
1377    /// - Create two endpoints, the first using manual event loop.
1378    /// - Subscribe both nodes to the same topic. The first node will subscribe twice and connect
1379    ///   to the second node. The second node will subscribe without bootstrap.
1380    /// - Ensure that the first node removes the subscription iff all topic handles have been
1381    ///   dropped
1382    // NOTE: this is a regression test.
1383    #[tokio::test]
1384    #[traced_test]
1385    async fn subscription_cleanup() -> Result {
1386        let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1387        let ct = CancellationToken::new();
1388        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1389
1390        // create the first node with a manual actor loop
1391        let (go1, actor, ep1_handle) =
1392            Gossip::t_new_with_actor(rng, Default::default(), relay_map.clone(), &ct).await?;
1393        let mut actor = ManualActorLoop::new(actor).await;
1394
1395        // create the second node with the usual actor loop
1396        let (go2, ep2, ep2_handle, _test_actor_handle) =
1397            Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1398
1399        let node_id1 = actor.endpoint.node_id();
1400        let node_id2 = ep2.node_id();
1401        tracing::info!(
1402            node_1 = %node_id1.fmt_short(),
1403            node_2 = %node_id2.fmt_short(),
1404            "nodes ready"
1405        );
1406
1407        let topic: TopicId = blake3::hash(b"subscription_cleanup").into();
1408        tracing::info!(%topic, "joining");
1409
1410        // create the tasks for each gossip instance:
1411        // - second node subscribes once without bootstrap and listens to events
1412        // - first node subscribes twice with the second node as bootstrap. This is done on command
1413        //   from the main task (this)
1414
1415        // second node
1416        let ct2 = ct.clone();
1417        let go2_task = async move {
1418            let (_pub_tx, mut sub_rx) = go2.subscribe_and_join(topic, vec![]).await?.split();
1419
1420            let subscribe_fut = async {
1421                while let Some(ev) = sub_rx.try_next().await? {
1422                    match ev {
1423                        Event::Lagged => tracing::debug!("missed some messages :("),
1424                        Event::Received(_) => unreachable!("test does not send messages"),
1425                        other => tracing::debug!(?other, "gs event"),
1426                    }
1427                }
1428
1429                tracing::debug!("subscribe stream ended");
1430                Result::<_, n0_snafu::Error>::Ok(())
1431            };
1432
1433            tokio::select! {
1434                _ = ct2.cancelled() => Ok(()),
1435                res = subscribe_fut => res,
1436            }
1437        }
1438        .instrument(tracing::debug_span!("node_2", %node_id2));
1439        let go2_handle = task::spawn(go2_task);
1440
1441        // first node
1442        let addr2 = NodeAddr::new(node_id2).with_relay_url(relay_url);
1443        let static_provider = StaticProvider::new();
1444        static_provider.add_node_info(addr2);
1445        actor.endpoint.discovery().add(static_provider);
1446        // we use a channel to signal advancing steps to the task
1447        let (tx, mut rx) = mpsc::channel::<()>(1);
1448        let ct1 = ct.clone();
1449        let go1_task = async move {
1450            // first subscribe is done immediately
1451            tracing::info!("subscribing the first time");
1452            let sub_1a = go1.subscribe_and_join(topic, vec![node_id2]).await?;
1453
1454            // wait for signal to subscribe a second time
1455            rx.recv().await.expect("signal for second subscribe");
1456            tracing::info!("subscribing a second time");
1457            let sub_1b = go1.subscribe_and_join(topic, vec![node_id2]).await?;
1458            drop(sub_1a);
1459
1460            // wait for signal to drop the second handle as well
1461            rx.recv().await.expect("signal for second subscribe");
1462            tracing::info!("dropping all handles");
1463            drop(sub_1b);
1464
1465            // wait for cancellation
1466            ct1.cancelled().await;
1467            drop(go1);
1468
1469            Result::<_, n0_snafu::Error>::Ok(())
1470        }
1471        .instrument(tracing::debug_span!("node_1", %node_id1));
1472        let go1_handle = task::spawn(go1_task);
1473
1474        // advance and check that the topic is now subscribed
1475        actor.steps(3).await; // handle our subscribe;
1476                              // get peer connection;
1477                              // receive the other peer's information for a NeighborUp
1478        let state = actor.topics.get(&topic).expect("get registered topic");
1479        assert!(state.joined());
1480
1481        // signal the second subscribe, we should remain subscribed
1482        tx.send(()).await.e()?;
1483        actor.steps(3).await; // subscribe; first receiver gone; first sender gone
1484        let state = actor.topics.get(&topic).expect("get registered topic");
1485        assert!(state.joined());
1486
1487        // signal to drop the second handle, the topic should no longer be subscribed
1488        tx.send(()).await.e()?;
1489        actor.steps(2).await; // second receiver gone; second sender gone
1490        assert!(!actor.topics.contains_key(&topic));
1491
1492        // cleanup and ensure everything went as expected
1493        ct.cancel();
1494        let wait = Duration::from_secs(2);
1495        timeout(wait, ep1_handle).await.e()?.e()??;
1496        timeout(wait, ep2_handle).await.e()?.e()??;
1497        timeout(wait, go1_handle).await.e()?.e()??;
1498        timeout(wait, go2_handle).await.e()?.e()??;
1499        timeout(wait, actor.finish()).await.e()?;
1500
1501        Ok(())
1502    }
1503
1504    /// Test that nodes can reconnect to each other.
1505    ///
1506    /// This test will create two nodes subscribed to the same topic. The second node will
1507    /// unsubscribe and then resubscribe and connection between the nodes should succeed both
1508    /// times.
1509    // NOTE: This is a regression test
1510    #[tokio::test]
1511    #[traced_test]
1512    async fn can_reconnect() -> Result {
1513        let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1514        let ct = CancellationToken::new();
1515        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1516
1517        let (go1, ep1, ep1_handle, _test_actor_handle1) =
1518            Gossip::t_new(rng, Default::default(), relay_map.clone(), &ct).await?;
1519
1520        let (go2, ep2, ep2_handle, _test_actor_handle2) =
1521            Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1522
1523        let node_id1 = ep1.node_id();
1524        let node_id2 = ep2.node_id();
1525        tracing::info!(
1526            node_1 = %node_id1.fmt_short(),
1527            node_2 = %node_id2.fmt_short(),
1528            "nodes ready"
1529        );
1530
1531        let topic: TopicId = blake3::hash(b"can_reconnect").into();
1532        tracing::info!(%topic, "joining");
1533
1534        let ct2 = ct.child_token();
1535        // channel used to signal the second gossip instance to advance the test
1536        let (tx, mut rx) = mpsc::channel::<()>(1);
1537        let addr1 = NodeAddr::new(node_id1).with_relay_url(relay_url.clone());
1538        let static_provider = StaticProvider::new();
1539        static_provider.add_node_info(addr1);
1540        ep2.discovery().add(static_provider.clone());
1541        let go2_task = async move {
1542            let mut sub = go2.subscribe(topic, Vec::new()).await?;
1543            sub.joined().await?;
1544
1545            rx.recv().await.expect("signal to unsubscribe");
1546            tracing::info!("unsubscribing");
1547            drop(sub);
1548
1549            rx.recv().await.expect("signal to subscribe again");
1550            tracing::info!("resubscribing");
1551            let mut sub = go2.subscribe(topic, vec![node_id1]).await?;
1552
1553            sub.joined().await?;
1554            tracing::info!("subscription successful!");
1555
1556            ct2.cancelled().await;
1557
1558            Result::<_, ApiError>::Ok(())
1559        }
1560        .instrument(tracing::debug_span!("node_2", %node_id2));
1561        let go2_handle = task::spawn(go2_task);
1562
1563        let addr2 = NodeAddr::new(node_id2).with_relay_url(relay_url);
1564        static_provider.add_node_info(addr2);
1565        ep1.discovery().add(static_provider);
1566
1567        let mut sub = go1.subscribe(topic, vec![node_id2]).await?;
1568        // wait for subscribed notification
1569        sub.joined().await?;
1570
1571        // signal node_2 to unsubscribe
1572        tx.send(()).await.e()?;
1573
1574        // we should receive a Neighbor down event
1575        let conn_timeout = Duration::from_millis(500);
1576        let ev = timeout(conn_timeout, sub.try_next()).await.e()??;
1577        assert_eq!(ev, Some(Event::NeighborDown(node_id2)));
1578        tracing::info!("node 2 left");
1579
1580        // signal node_2 to subscribe again
1581        tx.send(()).await.e()?;
1582
1583        let conn_timeout = Duration::from_millis(500);
1584        let ev = timeout(conn_timeout, sub.try_next()).await.e()??;
1585        assert_eq!(ev, Some(Event::NeighborUp(node_id2)));
1586        tracing::info!("node 2 rejoined!");
1587
1588        // cleanup and ensure everything went as expected
1589        ct.cancel();
1590        let wait = Duration::from_secs(2);
1591        timeout(wait, ep1_handle).await.e()?.e()??;
1592        timeout(wait, ep2_handle).await.e()?.e()??;
1593        timeout(wait, go2_handle).await.e()?.e()??;
1594
1595        Result::Ok(())
1596    }
1597
1598    #[tokio::test]
1599    #[traced_test]
1600    async fn can_die_and_reconnect() -> Result {
1601        /// Runs a future in a separate runtime on a separate thread, cancelling everything
1602        /// abruptly once `cancel` is invoked.
1603        fn run_in_thread<T: Send + 'static>(
1604            cancel: CancellationToken,
1605            fut: impl std::future::Future<Output = T> + Send + 'static,
1606        ) -> std::thread::JoinHandle<Option<T>> {
1607            std::thread::spawn(move || {
1608                let rt = tokio::runtime::Builder::new_current_thread()
1609                    .enable_all()
1610                    .build()
1611                    .unwrap();
1612                rt.block_on(async move { cancel.run_until_cancelled(fut).await })
1613            })
1614        }
1615
1616        /// Spawns a new endpoint and gossip instance.
1617        async fn spawn_gossip(
1618            secret_key: SecretKey,
1619            relay_map: RelayMap,
1620        ) -> Result<(Router, Gossip), BindError> {
1621            let ep = Endpoint::builder()
1622                .secret_key(secret_key)
1623                .relay_mode(RelayMode::Custom(relay_map))
1624                .insecure_skip_relay_cert_verify(true)
1625                .bind()
1626                .await?;
1627            let gossip = Gossip::builder().spawn(ep.clone());
1628            let router = Router::builder(ep)
1629                .accept(GOSSIP_ALPN, gossip.clone())
1630                .spawn();
1631            Ok((router, gossip))
1632        }
1633
1634        /// Spawns a gossip node, and broadcasts a single message, then sleep until cancelled externally.
1635        async fn broadcast_once(
1636            secret_key: SecretKey,
1637            relay_map: RelayMap,
1638            bootstrap_addr: NodeAddr,
1639            topic_id: TopicId,
1640            message: String,
1641        ) -> Result {
1642            let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1643            info!(node_id = %router.endpoint().node_id().fmt_short(), "broadcast node spawned");
1644            let bootstrap = vec![bootstrap_addr.node_id];
1645            let static_provider = StaticProvider::new();
1646            static_provider.add_node_info(bootstrap_addr);
1647            router.endpoint().discovery().add(static_provider);
1648            let mut topic = gossip.subscribe_and_join(topic_id, bootstrap).await?;
1649            topic.broadcast(message.as_bytes().to_vec().into()).await?;
1650            std::future::pending::<()>().await;
1651            Ok(())
1652        }
1653
1654        let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1655        let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1656        let topic_id = TopicId::from_bytes(rng.random());
1657
1658        // spawn a gossip node, send the node's address on addr_tx,
1659        // then wait to receive `count` messages, and terminate.
1660        let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
1661        let (msgs_recv_tx, mut msgs_recv_rx) = tokio::sync::mpsc::channel(3);
1662        let recv_task = tokio::task::spawn({
1663            let relay_map = relay_map.clone();
1664            let secret_key = SecretKey::generate(&mut rng);
1665            async move {
1666                let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1667                // wait for the relay to be set. iroh currently has issues when trying
1668                // to immediately reconnect with changed direct addresses, but when the
1669                // relay path is available it works.
1670                // See https://github.com/n0-computer/iroh/pull/3372
1671                router.endpoint().online().await;
1672                let addr = router.endpoint().node_addr();
1673                info!(node_id = %addr.node_id.fmt_short(), "recv node spawned");
1674                addr_tx.send(addr).unwrap();
1675                let mut topic = gossip.subscribe_and_join(topic_id, vec![]).await?;
1676                while let Some(event) = topic.try_next().await.unwrap() {
1677                    if let Event::Received(message) = event {
1678                        let message = std::str::from_utf8(&message.content).e()?.to_string();
1679                        msgs_recv_tx.send(message).await.e()?;
1680                    }
1681                }
1682                Result::<_, n0_snafu::Error>::Ok(())
1683            }
1684        });
1685
1686        let node0_addr = addr_rx.await.e()?;
1687        let max_wait = Duration::from_secs(5);
1688
1689        // spawn a node, send a message, and then abruptly terminate the node ungracefully
1690        // after the message was received on our receiver node.
1691        let cancel = CancellationToken::new();
1692        let secret = SecretKey::generate(&mut rng);
1693        let join_handle_1 = run_in_thread(
1694            cancel.clone(),
1695            broadcast_once(
1696                secret.clone(),
1697                relay_map.clone(),
1698                node0_addr.clone(),
1699                topic_id,
1700                "msg1".to_string(),
1701            ),
1702        );
1703        // assert that we received the message on the receiver node.
1704        let msg = timeout(max_wait, msgs_recv_rx.recv()).await.e()?.unwrap();
1705        assert_eq!(&msg, "msg1");
1706        info!("kill broadcast node");
1707        cancel.cancel();
1708
1709        // spawns the node again with the same node id, and send another message
1710        let cancel = CancellationToken::new();
1711        let join_handle_2 = run_in_thread(
1712            cancel.clone(),
1713            broadcast_once(
1714                secret.clone(),
1715                relay_map.clone(),
1716                node0_addr.clone(),
1717                topic_id,
1718                "msg2".to_string(),
1719            ),
1720        );
1721        // assert that we received the message on the receiver node.
1722        // this means that the reconnect with the same node id worked.
1723        let msg = timeout(max_wait, msgs_recv_rx.recv()).await.e()?.unwrap();
1724        assert_eq!(&msg, "msg2");
1725        info!("kill broadcast node");
1726        cancel.cancel();
1727
1728        info!("kill recv node");
1729        recv_task.abort();
1730        assert!(join_handle_1.join().unwrap().is_none());
1731        assert!(join_handle_2.join().unwrap().is_none());
1732
1733        Ok(())
1734    }
1735
1736    #[tokio::test]
1737    #[traced_test]
1738    async fn gossip_change_alpn() -> n0_snafu::Result<()> {
1739        let alpn = b"my-gossip-alpn";
1740        let topic_id = TopicId::from([0u8; 32]);
1741
1742        let ep1 = Endpoint::builder().bind().await?;
1743        let ep2 = Endpoint::builder().bind().await?;
1744        let gossip1 = Gossip::builder().alpn(alpn).spawn(ep1.clone());
1745        let gossip2 = Gossip::builder().alpn(alpn).spawn(ep2.clone());
1746        let router1 = Router::builder(ep1).accept(alpn, gossip1.clone()).spawn();
1747        let router2 = Router::builder(ep2).accept(alpn, gossip2.clone()).spawn();
1748
1749        let addr1 = router1.endpoint().node_addr();
1750        let id1 = addr1.node_id;
1751        let static_provider = StaticProvider::new();
1752        static_provider.add_node_info(addr1);
1753        router2.endpoint().discovery().add(static_provider);
1754
1755        let mut topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1756        let mut topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1757
1758        timeout(Duration::from_secs(3), topic1.joined())
1759            .await
1760            .e()??;
1761        timeout(Duration::from_secs(3), topic2.joined())
1762            .await
1763            .e()??;
1764        router1.shutdown().await.e()?;
1765        router2.shutdown().await.e()?;
1766        Ok(())
1767    }
1768
1769    #[tokio::test]
1770    #[traced_test]
1771    async fn gossip_rely_on_gossip_discovery() -> n0_snafu::Result<()> {
1772        let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1773
1774        async fn spawn(
1775            rng: &mut impl CryptoRng,
1776        ) -> n0_snafu::Result<(NodeId, Router, Gossip, GossipSender, GossipReceiver)> {
1777            let topic_id = TopicId::from([0u8; 32]);
1778            let ep = Endpoint::builder()
1779                .secret_key(SecretKey::generate(rng))
1780                .relay_mode(RelayMode::Disabled)
1781                .bind()
1782                .await?;
1783            let node_id = ep.node_id();
1784            let gossip = Gossip::builder().spawn(ep.clone());
1785            let router = Router::builder(ep)
1786                .accept(GOSSIP_ALPN, gossip.clone())
1787                .spawn();
1788            let topic = gossip.subscribe(topic_id, vec![]).await?;
1789            let (sender, receiver) = topic.split();
1790            Ok((node_id, router, gossip, sender, receiver))
1791        }
1792
1793        // spawn 3 nodes without relay or discovery
1794        let (n1, r1, _g1, _tx1, mut rx1) = spawn(rng).await?;
1795        let (n2, r2, _g2, tx2, mut rx2) = spawn(rng).await?;
1796        let (n3, r3, _g3, tx3, mut rx3) = spawn(rng).await?;
1797
1798        println!("nodes {:?}", [n1, n2, n3]);
1799
1800        // create a static discovery that has only node 1 addr info set
1801        let addr1 = r1.endpoint().node_addr();
1802        let disco = StaticProvider::new();
1803        disco.add_node_info(addr1);
1804
1805        // add addr info of node1 to node2 and join node1
1806        r2.endpoint().discovery().add(disco.clone());
1807        tx2.join_peers(vec![n1]).await?;
1808
1809        // await join node2 -> nodde1
1810        timeout(Duration::from_secs(3), rx1.joined()).await.e()??;
1811        timeout(Duration::from_secs(3), rx2.joined()).await.e()??;
1812
1813        // add addr info of node1 to node3 and join node1
1814        r3.endpoint().discovery().add(disco.clone());
1815        tx3.join_peers(vec![n1]).await?;
1816
1817        // await join at node3: n1 and n2
1818        // n2 only works because because we use gossip discovery!
1819        let ev = timeout(Duration::from_secs(3), rx3.next()).await.e()?;
1820        assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1821        let ev = timeout(Duration::from_secs(3), rx3.next()).await.e()?;
1822        assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1823
1824        assert_eq!(sorted(rx3.neighbors()), sorted([n1, n2]));
1825
1826        let ev = timeout(Duration::from_secs(3), rx2.next()).await.e()?;
1827        assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1828
1829        let ev = timeout(Duration::from_secs(3), rx1.next()).await.e()?;
1830        assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1831
1832        tokio::try_join!(r1.shutdown(), r2.shutdown(), r3.shutdown()).e()?;
1833        Ok(())
1834    }
1835
1836    fn sorted<T: Ord>(input: impl IntoIterator<Item = T>) -> Vec<T> {
1837        let mut out: Vec<_> = input.into_iter().collect();
1838        out.sort();
1839        out
1840    }
1841}