iroh_gossip/
net.rs

1//! Networking for the `iroh-gossip` protocol
2
3use std::{
4    collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
5    net::SocketAddr,
6    pin::Pin,
7    sync::Arc,
8    task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_concurrency::stream::{stream_group, StreamGroup};
13use futures_util::FutureExt as _;
14use iroh::{
15    endpoint::Connection,
16    protocol::{AcceptError, ProtocolHandler},
17    Endpoint, EndpointAddr, EndpointId, PublicKey, RelayUrl, Watcher,
18};
19use irpc::WithChannels;
20use n0_error::{e, stack_error};
21use n0_future::{
22    task::{self, AbortOnDropHandle, JoinSet},
23    time::Instant,
24    Stream, StreamExt as _,
25};
26use rand::{rngs::StdRng, SeedableRng};
27use serde::{Deserialize, Serialize};
28use tokio::sync::{broadcast, mpsc, oneshot};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, error_span, trace, warn, Instrument};
31
32use self::{
33    discovery::GossipDiscovery,
34    util::{RecvLoop, SendLoop, Timers},
35};
36use crate::{
37    api::{self, Command, Event, GossipApi, RpcMessage},
38    metrics::Metrics,
39    proto::{self, HyparviewConfig, PeerData, PlumtreeConfig, Scope, TopicId},
40};
41
42mod discovery;
43mod util;
44
45/// ALPN protocol name
46pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/1";
47
48/// Channel capacity for the send queue (one per connection)
49const SEND_QUEUE_CAP: usize = 64;
50/// Channel capacity for the ToActor message queue (single)
51const TO_ACTOR_CAP: usize = 64;
52/// Channel capacity for the InEvent message queue (single)
53const IN_EVENT_CAP: usize = 1024;
54/// Channel capacity for broadcast subscriber event queue (one per topic)
55const TOPIC_EVENT_CAP: usize = 256;
56
57/// Events emitted from the gossip protocol
58pub type ProtoEvent = proto::Event<PublicKey>;
59/// Commands for the gossip protocol
60pub type ProtoCommand = proto::Command<PublicKey>;
61
62type InEvent = proto::InEvent<PublicKey>;
63type OutEvent = proto::OutEvent<PublicKey>;
64type Timer = proto::Timer<PublicKey>;
65type ProtoMessage = proto::Message<PublicKey>;
66
67/// Publish and subscribe on gossiping topics.
68///
69/// Each topic is a separate broadcast tree with separate memberships.
70///
71/// A topic has to be joined before you can publish or subscribe on the topic.
72/// To join the swarm for a topic, you have to know the [`PublicKey`] of at least one peer that also joined the topic.
73///
74/// Messages published on the swarm will be delivered to all peers that joined the swarm for that
75/// topic. You will also be relaying (gossiping) messages published by other peers.
76///
77/// With the default settings, the protocol will maintain up to 5 peer connections per topic.
78///
79/// Even though the [`Gossip`] is created from a [`Endpoint`], it does not accept connections
80/// itself. You should run an accept loop on the [`Endpoint`] yourself, check the ALPN protocol of incoming
81/// connections, and if the ALPN protocol equals [`GOSSIP_ALPN`], forward the connection to the
82/// gossip actor through [Self::handle_connection].
83///
84/// The gossip actor will, however, initiate new connections to other peers by itself.
85#[derive(Debug, Clone)]
86pub struct Gossip {
87    pub(crate) inner: Arc<Inner>,
88}
89
90impl std::ops::Deref for Gossip {
91    type Target = GossipApi;
92    fn deref(&self) -> &Self::Target {
93        &self.inner.api
94    }
95}
96
97#[derive(Debug)]
98enum LocalActorMessage {
99    HandleConnection(Connection),
100    Shutdown { reply: oneshot::Sender<()> },
101}
102
103#[allow(missing_docs)]
104#[stack_error(derive, add_meta)]
105#[non_exhaustive]
106pub enum Error {
107    ActorDropped {},
108}
109
110impl<T> From<mpsc::error::SendError<T>> for Error {
111    fn from(_value: mpsc::error::SendError<T>) -> Self {
112        e!(Error::ActorDropped)
113    }
114}
115impl From<oneshot::error::RecvError> for Error {
116    fn from(_value: oneshot::error::RecvError) -> Self {
117        e!(Error::ActorDropped)
118    }
119}
120
121#[derive(Debug)]
122pub(crate) struct Inner {
123    api: GossipApi,
124    local_tx: mpsc::Sender<LocalActorMessage>,
125    _actor_handle: AbortOnDropHandle<()>,
126    max_message_size: usize,
127    metrics: Arc<Metrics>,
128}
129
130impl ProtocolHandler for Gossip {
131    async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
132        self.handle_connection(connection)
133            .await
134            .map_err(AcceptError::from_err)?;
135        Ok(())
136    }
137
138    async fn shutdown(&self) {
139        if let Err(err) = self.shutdown().await {
140            warn!("error while shutting down gossip: {err:#}");
141        }
142    }
143}
144
145/// Builder to configure and construct [`Gossip`].
146#[derive(Debug, Clone)]
147pub struct Builder {
148    config: proto::Config,
149    alpn: Option<Bytes>,
150}
151
152impl Builder {
153    /// Sets the maximum message size in bytes.
154    /// By default this is `4096` bytes.
155    pub fn max_message_size(mut self, size: usize) -> Self {
156        self.config.max_message_size = size;
157        self
158    }
159
160    /// Set the membership configuration.
161    pub fn membership_config(mut self, config: HyparviewConfig) -> Self {
162        self.config.membership = config;
163        self
164    }
165
166    /// Set the broadcast configuration.
167    pub fn broadcast_config(mut self, config: PlumtreeConfig) -> Self {
168        self.config.broadcast = config;
169        self
170    }
171
172    /// Set the ALPN this gossip instance uses.
173    ///
174    /// It has to be the same for all peers in the network. If you set a custom ALPN,
175    /// you have to use the same ALPN when registering the [`Gossip`] in on a iroh
176    /// router with [`RouterBuilder::accept`].
177    ///
178    /// [`RouterBuilder::accept`]: iroh::protocol::RouterBuilder::accept
179    pub fn alpn(mut self, alpn: impl AsRef<[u8]>) -> Self {
180        self.alpn = Some(alpn.as_ref().to_vec().into());
181        self
182    }
183
184    /// Spawn a gossip actor and get a handle for it
185    pub fn spawn(self, endpoint: Endpoint) -> Gossip {
186        let metrics = Arc::new(Metrics::default());
187        let discovery = GossipDiscovery::default();
188        endpoint.discovery().add(discovery.clone());
189        let (actor, rpc_tx, local_tx) =
190            Actor::new(endpoint, self.config, metrics.clone(), self.alpn, discovery);
191        let me = actor.endpoint.id().fmt_short();
192        let max_message_size = actor.state.max_message_size();
193
194        let actor_handle = task::spawn(actor.run().instrument(error_span!("gossip", %me)));
195
196        let api = GossipApi::local(rpc_tx);
197
198        Gossip {
199            inner: Inner {
200                api,
201                local_tx,
202                _actor_handle: AbortOnDropHandle::new(actor_handle),
203                max_message_size,
204                metrics,
205            }
206            .into(),
207        }
208    }
209}
210
211impl Gossip {
212    /// Creates a default `Builder`, with the endpoint set.
213    pub fn builder() -> Builder {
214        Builder {
215            config: Default::default(),
216            alpn: None,
217        }
218    }
219
220    /// Listen on a quinn endpoint for incoming RPC connections.
221    #[cfg(feature = "rpc")]
222    pub async fn listen(self, endpoint: quinn::Endpoint) {
223        self.inner.api.listen(endpoint).await
224    }
225
226    /// Get the maximum message size configured for this gossip actor.
227    pub fn max_message_size(&self) -> usize {
228        self.inner.max_message_size
229    }
230
231    /// Handle an incoming [`Connection`].
232    ///
233    /// Make sure to check the ALPN protocol yourself before passing the connection.
234    pub async fn handle_connection(&self, conn: Connection) -> Result<(), Error> {
235        self.inner
236            .local_tx
237            .send(LocalActorMessage::HandleConnection(conn))
238            .await?;
239        Ok(())
240    }
241
242    /// Shutdown the gossip instance.
243    ///
244    /// This leaves all topics, sending `Disconnect` messages to peers, and then
245    /// stops the gossip actor loop and drops all state and connections.
246    pub async fn shutdown(&self) -> Result<(), Error> {
247        let (reply, reply_rx) = oneshot::channel();
248        self.inner
249            .local_tx
250            .send(LocalActorMessage::Shutdown { reply })
251            .await?;
252        reply_rx.await?;
253        Ok(())
254    }
255
256    /// Returns the metrics tracked for this gossip instance.
257    pub fn metrics(&self) -> &Arc<Metrics> {
258        &self.inner.metrics
259    }
260}
261
262/// Actor that sends and handles messages between the connection and main state loops
263struct Actor {
264    alpn: Bytes,
265    /// Protocol state
266    state: proto::State<PublicKey, StdRng>,
267    /// The endpoint through which we dial peers
268    endpoint: Endpoint,
269    /// Dial machine to connect to peers
270    dialer: Dialer,
271    /// Input messages to the actor
272    rpc_rx: mpsc::Receiver<RpcMessage>,
273    local_rx: mpsc::Receiver<LocalActorMessage>,
274    /// Sender for the state input (cloned into the connection loops)
275    in_event_tx: mpsc::Sender<InEvent>,
276    /// Input events to the state (emitted from the connection loops)
277    in_event_rx: mpsc::Receiver<InEvent>,
278    /// Queued timers
279    timers: Timers<Timer>,
280    /// Map of topics to their state.
281    topics: HashMap<TopicId, TopicState>,
282    /// Map of peers to their state.
283    peers: HashMap<EndpointId, PeerState>,
284    /// Stream of commands from topic handles.
285    command_rx: stream_group::Keyed<TopicCommandStream>,
286    /// Internal queue of topic to close because all handles were dropped.
287    quit_queue: VecDeque<TopicId>,
288    /// Tasks for the connection loops, to keep track of panics.
289    connection_tasks: JoinSet<(EndpointId, Connection, Result<(), ConnectionLoopError>)>,
290    metrics: Arc<Metrics>,
291    topic_event_forwarders: JoinSet<TopicId>,
292    discovery: GossipDiscovery,
293}
294
295impl Actor {
296    fn new(
297        endpoint: Endpoint,
298        config: proto::Config,
299        metrics: Arc<Metrics>,
300        alpn: Option<Bytes>,
301        discovery: GossipDiscovery,
302    ) -> (
303        Self,
304        mpsc::Sender<RpcMessage>,
305        mpsc::Sender<LocalActorMessage>,
306    ) {
307        let peer_id = endpoint.id();
308        let dialer = Dialer::new(endpoint.clone());
309        let state = proto::State::new(
310            peer_id,
311            Default::default(),
312            config,
313            rand::rngs::StdRng::from_rng(&mut rand::rng()),
314        );
315        let (rpc_tx, rpc_rx) = mpsc::channel(TO_ACTOR_CAP);
316        let (local_tx, local_rx) = mpsc::channel(16);
317        let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
318
319        let actor = Actor {
320            alpn: alpn.unwrap_or_else(|| GOSSIP_ALPN.to_vec().into()),
321            endpoint,
322            state,
323            dialer,
324            rpc_rx,
325            in_event_rx,
326            in_event_tx,
327            timers: Timers::new(),
328            command_rx: StreamGroup::new().keyed(),
329            peers: Default::default(),
330            topics: Default::default(),
331            quit_queue: Default::default(),
332            connection_tasks: Default::default(),
333            metrics,
334            local_rx,
335            topic_event_forwarders: Default::default(),
336            discovery,
337        };
338
339        (actor, rpc_tx, local_tx)
340    }
341
342    pub async fn run(mut self) {
343        let mut addr_update_stream = self.setup().await;
344
345        let mut i = 0;
346        while self.event_loop(&mut addr_update_stream, i).await {
347            i += 1;
348        }
349    }
350
351    /// Performs the initial actor setup to run the [`Actor::event_loop`].
352    ///
353    /// This updates our current address and return it. It also returns the home relay stream and
354    /// direct addr stream.
355    async fn setup(&mut self) -> impl Stream<Item = EndpointAddr> + Send + Unpin + use<> {
356        let addr_update_stream = self.endpoint.watch_addr().stream();
357        let initial_addr = self.endpoint.addr();
358        self.handle_addr_update(initial_addr).await;
359        addr_update_stream
360    }
361
362    /// One event loop processing step.
363    ///
364    /// None is returned when no further processing should be performed.
365    async fn event_loop(
366        &mut self,
367        addr_updates: &mut (impl Stream<Item = EndpointAddr> + Send + Unpin),
368        i: usize,
369    ) -> bool {
370        self.metrics.actor_tick_main.inc();
371        tokio::select! {
372            biased;
373            conn = self.local_rx.recv() => {
374                match conn {
375                    Some(LocalActorMessage::Shutdown { reply }) => {
376                        debug!("received shutdown message, quit all topics");
377                        self.quit_queue.extend(self.topics.keys().copied());
378                        self.process_quit_queue().await;
379                        debug!("all topics quit, stop gossip actor");
380                        reply.send(()).ok();
381                        return false;
382                    },
383                    Some(LocalActorMessage::HandleConnection(conn)) => {
384                        self.handle_connection(conn.remote_id(), ConnOrigin::Accept, conn);
385                    }
386                    None => {
387                        debug!("all gossip handles dropped, stop gossip actor");
388                        return false;
389                    }
390                }
391            }
392            msg = self.rpc_rx.recv() => {
393                trace!(?i, "tick: to_actor_rx");
394                self.metrics.actor_tick_rx.inc();
395                match msg {
396                    Some(msg) => {
397                        self.handle_rpc_msg(msg, Instant::now()).await;
398                    }
399                    None => {
400                        debug!("all gossip handles dropped, stop gossip actor");
401                        return false;
402                    }
403                }
404            },
405            Some((key, (topic, command))) = self.command_rx.next(), if !self.command_rx.is_empty() => {
406                trace!(?i, "tick: command_rx");
407                self.handle_command(topic, key, command).await;
408            },
409            Some(new_address) = addr_updates.next() => {
410                trace!(?i, "tick: new_address");
411                self.metrics.actor_tick_endpoint.inc();
412                self.handle_addr_update(new_address).await;
413            }
414            (peer_id, res) = self.dialer.next_conn() => {
415                trace!(?i, "tick: dialer");
416                self.metrics.actor_tick_dialer.inc();
417                match res {
418                    Some(Ok(conn)) => {
419                        debug!(peer = %peer_id.fmt_short(), "dial successful");
420                        self.metrics.actor_tick_dialer_success.inc();
421                        self.handle_connection(peer_id, ConnOrigin::Dial, conn);
422                    }
423                    Some(Err(err)) => {
424                        warn!(peer = %peer_id.fmt_short(), "dial failed: {err}");
425                        self.metrics.actor_tick_dialer_failure.inc();
426                        let peer_state = self.peers.get(&peer_id);
427                        let is_active = matches!(peer_state, Some(PeerState::Active { .. }));
428                        if !is_active {
429                            self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
430                                .await;
431                        }
432                    }
433                    None => {
434                        warn!(peer = %peer_id.fmt_short(), "dial disconnected");
435                        self.metrics.actor_tick_dialer_failure.inc();
436                    }
437                }
438            }
439            event = self.in_event_rx.recv() => {
440                trace!(?i, "tick: in_event_rx");
441                self.metrics.actor_tick_in_event_rx.inc();
442                let event = event.expect("unreachable: in_event_tx is never dropped before receiver");
443                self.handle_in_event(event, Instant::now()).await;
444            }
445            _ = self.timers.wait_next() => {
446                trace!(?i, "tick: timers");
447                self.metrics.actor_tick_timers.inc();
448                let now = Instant::now();
449                while let Some((_instant, timer)) = self.timers.pop_before(now) {
450                    self.handle_in_event(InEvent::TimerExpired(timer), now).await;
451                }
452            }
453            Some(res) = self.connection_tasks.join_next(), if !self.connection_tasks.is_empty() => {
454                trace!(?i, "tick: connection_tasks");
455                let (peer_id, conn, result) = res.expect("connection task panicked");
456                self.handle_connection_task_finished(peer_id, conn, result).await;
457            }
458            Some(res) = self.topic_event_forwarders.join_next(), if !self.topic_event_forwarders.is_empty() => {
459                let topic_id = res.expect("topic event forwarder panicked");
460                if let Some(state) = self.topics.get_mut(&topic_id) {
461                    if !state.still_needed() {
462                        self.quit_queue.push_back(topic_id);
463                        self.process_quit_queue().await;
464                    }
465                }
466            }
467        }
468
469        true
470    }
471
472    async fn handle_addr_update(&mut self, endpoint_addr: EndpointAddr) {
473        // let peer_data = our_peer_data(&self.endpoint, current_addresses);
474        let peer_data = encode_peer_data(&endpoint_addr.into());
475        self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now())
476            .await
477    }
478
479    async fn handle_command(
480        &mut self,
481        topic: TopicId,
482        key: stream_group::Key,
483        command: Option<Command>,
484    ) {
485        debug!(?topic, ?key, ?command, "handle command");
486        let Some(state) = self.topics.get_mut(&topic) else {
487            // TODO: unreachable?
488            warn!("received command for unknown topic");
489            return;
490        };
491        match command {
492            Some(command) => {
493                let command = match command {
494                    Command::Broadcast(message) => ProtoCommand::Broadcast(message, Scope::Swarm),
495                    Command::BroadcastNeighbors(message) => {
496                        ProtoCommand::Broadcast(message, Scope::Neighbors)
497                    }
498                    Command::JoinPeers(peers) => ProtoCommand::Join(peers),
499                };
500                self.handle_in_event(proto::InEvent::Command(topic, command), Instant::now())
501                    .await;
502            }
503            None => {
504                state.command_rx_keys.remove(&key);
505                if !state.still_needed() {
506                    self.quit_queue.push_back(topic);
507                    self.process_quit_queue().await;
508                }
509            }
510        }
511    }
512
513    fn handle_connection(&mut self, peer_id: EndpointId, origin: ConnOrigin, conn: Connection) {
514        let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
515        let conn_id = conn.stable_id();
516
517        let queue = match self.peers.entry(peer_id) {
518            Entry::Occupied(mut entry) => entry.get_mut().accept_conn(send_tx, conn_id),
519            Entry::Vacant(entry) => {
520                entry.insert(PeerState::Active {
521                    active_send_tx: send_tx,
522                    active_conn_id: conn_id,
523                    other_conns: Vec::new(),
524                });
525                Vec::new()
526            }
527        };
528
529        let max_message_size = self.state.max_message_size();
530        let in_event_tx = self.in_event_tx.clone();
531
532        // Spawn a task for this connection
533        self.connection_tasks.spawn(
534            async move {
535                let res = connection_loop(
536                    peer_id,
537                    conn.clone(),
538                    origin,
539                    send_rx,
540                    in_event_tx,
541                    max_message_size,
542                    queue,
543                )
544                .await;
545                (peer_id, conn, res)
546            }
547            .instrument(error_span!("conn", peer = %peer_id.fmt_short())),
548        );
549    }
550
551    #[tracing::instrument(name = "conn", skip_all, fields(peer = %peer_id.fmt_short()))]
552    async fn handle_connection_task_finished(
553        &mut self,
554        peer_id: EndpointId,
555        conn: Connection,
556        task_result: Result<(), ConnectionLoopError>,
557    ) {
558        if conn.close_reason().is_none() {
559            conn.close(0u32.into(), b"close from disconnect");
560        }
561        let reason = conn.close_reason().expect("just closed");
562        let error = task_result.err();
563        debug!(%reason, ?error, "connection closed");
564        if let Some(PeerState::Active {
565            active_conn_id,
566            other_conns,
567            ..
568        }) = self.peers.get_mut(&peer_id)
569        {
570            if conn.stable_id() == *active_conn_id {
571                debug!("active send connection closed, mark peer as disconnected");
572                self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
573                    .await;
574            } else {
575                other_conns.retain(|x| *x != conn.stable_id());
576                debug!("remaining {} other connections", other_conns.len() + 1);
577            }
578        } else {
579            debug!("peer already marked as disconnected");
580        }
581    }
582
583    async fn handle_rpc_msg(&mut self, msg: RpcMessage, now: Instant) {
584        trace!("handle to_actor  {msg:?}");
585        match msg {
586            RpcMessage::Join(msg) => {
587                let WithChannels {
588                    inner,
589                    rx,
590                    tx,
591                    // TODO(frando): make use of span?
592                    span: _,
593                } = msg;
594                let api::JoinRequest {
595                    topic_id,
596                    bootstrap,
597                } = inner;
598                let TopicState {
599                    neighbors,
600                    event_sender,
601                    command_rx_keys,
602                } = self.topics.entry(topic_id).or_default();
603                let mut sender_dead = false;
604                if !neighbors.is_empty() {
605                    for neighbor in neighbors.iter() {
606                        if let Err(_err) = tx.try_send(Event::NeighborUp(*neighbor)).await {
607                            sender_dead = true;
608                            break;
609                        }
610                    }
611                }
612
613                if !sender_dead {
614                    let fut =
615                        topic_subscriber_loop(tx, event_sender.subscribe()).map(move |_| topic_id);
616                    self.topic_event_forwarders
617                        .spawn(fut.instrument(tracing::Span::current()));
618                }
619                let command_rx = TopicCommandStream::new(topic_id, Box::pin(rx.into_stream()));
620                let key = self.command_rx.insert(command_rx);
621                command_rx_keys.insert(key);
622
623                self.handle_in_event(
624                    InEvent::Command(
625                        topic_id,
626                        ProtoCommand::Join(bootstrap.into_iter().collect()),
627                    ),
628                    now,
629                )
630                .await;
631            }
632        }
633    }
634
635    async fn handle_in_event(&mut self, event: InEvent, now: Instant) {
636        self.handle_in_event_inner(event, now).await;
637        self.process_quit_queue().await;
638    }
639
640    async fn process_quit_queue(&mut self) {
641        while let Some(topic_id) = self.quit_queue.pop_front() {
642            self.handle_in_event_inner(
643                InEvent::Command(topic_id, ProtoCommand::Quit),
644                Instant::now(),
645            )
646            .await;
647            if self.topics.remove(&topic_id).is_some() {
648                tracing::debug!(%topic_id, "publishers and subscribers gone; unsubscribing");
649            }
650        }
651    }
652
653    async fn handle_in_event_inner(&mut self, event: InEvent, now: Instant) {
654        if matches!(event, InEvent::TimerExpired(_)) {
655            trace!(?event, "handle in_event");
656        } else {
657            debug!(?event, "handle in_event");
658        };
659        let out = self.state.handle(event, now, Some(&self.metrics));
660        for event in out {
661            if matches!(event, OutEvent::ScheduleTimer(_, _)) {
662                trace!(?event, "handle out_event");
663            } else {
664                debug!(?event, "handle out_event");
665            };
666            match event {
667                OutEvent::SendMessage(peer_id, message) => {
668                    let state = self.peers.entry(peer_id).or_default();
669                    match state {
670                        PeerState::Active { active_send_tx, .. } => {
671                            if let Err(_err) = active_send_tx.send(message).await {
672                                // Removing the peer is handled by the in_event PeerDisconnected sent
673                                // in [`Self::handle_connection_task_finished`].
674                                warn!(
675                                    peer = %peer_id.fmt_short(),
676                                    "failed to send: connection task send loop terminated",
677                                );
678                            }
679                        }
680                        PeerState::Pending { queue } => {
681                            if queue.is_empty() {
682                                debug!(peer = %peer_id.fmt_short(), "start to dial");
683                                self.dialer.queue_dial(peer_id, self.alpn.clone());
684                            }
685                            queue.push(message);
686                        }
687                    }
688                }
689                OutEvent::EmitEvent(topic_id, event) => {
690                    let Some(state) = self.topics.get_mut(&topic_id) else {
691                        // TODO: unreachable?
692                        warn!(?topic_id, "gossip state emitted event for unknown topic");
693                        continue;
694                    };
695                    let TopicState {
696                        neighbors,
697                        event_sender,
698                        ..
699                    } = state;
700                    match &event {
701                        ProtoEvent::NeighborUp(neighbor) => {
702                            neighbors.insert(*neighbor);
703                        }
704                        ProtoEvent::NeighborDown(neighbor) => {
705                            neighbors.remove(neighbor);
706                        }
707                        _ => {}
708                    }
709                    event_sender.send(event).ok();
710                    if !state.still_needed() {
711                        self.quit_queue.push_back(topic_id);
712                    }
713                }
714                OutEvent::ScheduleTimer(delay, timer) => {
715                    self.timers.insert(now + delay, timer);
716                }
717                OutEvent::DisconnectPeer(peer_id) => {
718                    // signal disconnection by dropping the senders to the connection
719                    debug!(peer=%peer_id.fmt_short(), "gossip state indicates disconnect: drop peer");
720                    self.peers.remove(&peer_id);
721                }
722                OutEvent::PeerData(endpoint_id, data) => match decode_peer_data(&data) {
723                    Err(err) => warn!("Failed to decode {data:?} from {endpoint_id}: {err}"),
724                    Ok(info) => {
725                        debug!(peer = ?endpoint_id, "add known addrs: {info:?}");
726                        let mut endpoint_addr = EndpointAddr::new(endpoint_id);
727                        for addr in info.direct_addresses {
728                            endpoint_addr = endpoint_addr.with_ip_addr(addr);
729                        }
730                        if let Some(relay_url) = info.relay_url {
731                            endpoint_addr = endpoint_addr.with_relay_url(relay_url);
732                        }
733
734                        self.discovery.add(endpoint_addr);
735                    }
736                },
737            }
738        }
739    }
740}
741
742type ConnId = usize;
743
744#[derive(Debug)]
745enum PeerState {
746    Pending {
747        queue: Vec<ProtoMessage>,
748    },
749    Active {
750        active_send_tx: mpsc::Sender<ProtoMessage>,
751        active_conn_id: ConnId,
752        other_conns: Vec<ConnId>,
753    },
754}
755
756impl PeerState {
757    fn accept_conn(
758        &mut self,
759        send_tx: mpsc::Sender<ProtoMessage>,
760        conn_id: ConnId,
761    ) -> Vec<ProtoMessage> {
762        match self {
763            PeerState::Pending { queue } => {
764                let queue = std::mem::take(queue);
765                *self = PeerState::Active {
766                    active_send_tx: send_tx,
767                    active_conn_id: conn_id,
768                    other_conns: Vec::new(),
769                };
770                queue
771            }
772            PeerState::Active {
773                active_send_tx,
774                active_conn_id,
775                other_conns,
776            } => {
777                // We already have an active connection. We keep the old connection intact,
778                // but only use the new connection for sending from now on.
779                // By dropping the `send_tx` of the old connection, the send loop part of
780                // the `connection_loop` of the old connection will terminate, which will also
781                // notify the peer that the old connection may be dropped.
782                other_conns.push(*active_conn_id);
783                *active_send_tx = send_tx;
784                *active_conn_id = conn_id;
785                Vec::new()
786            }
787        }
788    }
789}
790
791impl Default for PeerState {
792    fn default() -> Self {
793        PeerState::Pending { queue: Vec::new() }
794    }
795}
796
797#[derive(Debug)]
798struct TopicState {
799    neighbors: BTreeSet<EndpointId>,
800    event_sender: broadcast::Sender<ProtoEvent>,
801    /// Keys identifying command receivers in [`Actor::command_rx`].
802    ///
803    /// This represents the receiver side of gossip's publish public API.
804    command_rx_keys: HashSet<stream_group::Key>,
805}
806
807impl Default for TopicState {
808    fn default() -> Self {
809        let (event_sender, _) = broadcast::channel(TOPIC_EVENT_CAP);
810        Self {
811            neighbors: Default::default(),
812            command_rx_keys: Default::default(),
813            event_sender,
814        }
815    }
816}
817
818impl TopicState {
819    /// Check if the topic still has any publisher or subscriber.
820    fn still_needed(&self) -> bool {
821        // Keep topic alive if either senders or receivers exist.
822        // Using || prevents topic closure when senders are dropped while receivers listen.
823        !self.command_rx_keys.is_empty() || self.event_sender.receiver_count() > 0
824    }
825
826    #[cfg(test)]
827    fn joined(&self) -> bool {
828        !self.neighbors.is_empty()
829    }
830}
831
832/// Whether a connection is initiated by us (Dial) or by the remote peer (Accept)
833#[derive(Debug, Clone, Copy, PartialEq, Eq)]
834enum ConnOrigin {
835    Accept,
836    Dial,
837}
838
839#[allow(missing_docs)]
840#[stack_error(derive, add_meta, from_sources, std_sources)]
841#[non_exhaustive]
842enum ConnectionLoopError {
843    #[error(transparent)]
844    Write {
845        source: self::util::WriteError,
846    },
847    #[error(transparent)]
848    Read {
849        source: self::util::ReadError,
850    },
851    #[error(transparent)]
852    Connection {
853        #[error(std_err)]
854        source: iroh::endpoint::ConnectionError,
855    },
856    ActorDropped {},
857}
858
859impl<T> From<mpsc::error::SendError<T>> for ConnectionLoopError {
860    fn from(_value: mpsc::error::SendError<T>) -> Self {
861        e!(ConnectionLoopError::ActorDropped)
862    }
863}
864
865async fn connection_loop(
866    from: PublicKey,
867    conn: Connection,
868    origin: ConnOrigin,
869    send_rx: mpsc::Receiver<ProtoMessage>,
870    in_event_tx: mpsc::Sender<InEvent>,
871    max_message_size: usize,
872    queue: Vec<ProtoMessage>,
873) -> Result<(), ConnectionLoopError> {
874    debug!(?origin, "connection established");
875
876    let mut send_loop = SendLoop::new(conn.clone(), send_rx, max_message_size);
877    let mut recv_loop = RecvLoop::new(from, conn, in_event_tx, max_message_size);
878
879    let send_fut = send_loop.run(queue).instrument(error_span!("send"));
880    let recv_fut = recv_loop.run().instrument(error_span!("recv"));
881
882    let (send_res, recv_res) = tokio::join!(send_fut, recv_fut);
883    send_res?;
884    recv_res?;
885    Ok(())
886}
887
888#[derive(Default, Debug, Clone, Serialize, Deserialize)]
889struct AddrInfo {
890    relay_url: Option<RelayUrl>,
891    direct_addresses: BTreeSet<SocketAddr>,
892}
893
894impl From<EndpointAddr> for AddrInfo {
895    fn from(endpoint_addr: EndpointAddr) -> Self {
896        Self {
897            relay_url: endpoint_addr.relay_urls().next().cloned(),
898            direct_addresses: endpoint_addr.ip_addrs().cloned().collect(),
899        }
900    }
901}
902
903fn encode_peer_data(info: &AddrInfo) -> PeerData {
904    let bytes = postcard::to_stdvec(info).expect("serializing AddrInfo may not fail");
905    PeerData::new(bytes)
906}
907
908fn decode_peer_data(peer_data: &PeerData) -> Result<AddrInfo, postcard::Error> {
909    let bytes = peer_data.as_bytes();
910    if bytes.is_empty() {
911        return Ok(AddrInfo::default());
912    }
913    let info = postcard::from_bytes(bytes)?;
914    Ok(info)
915}
916
917async fn topic_subscriber_loop(
918    sender: irpc::channel::mpsc::Sender<Event>,
919    mut topic_events: broadcast::Receiver<ProtoEvent>,
920) {
921    loop {
922        tokio::select! {
923           biased;
924           msg = topic_events.recv() => {
925               let event = match msg {
926                   Err(broadcast::error::RecvError::Closed) => break,
927                   Err(broadcast::error::RecvError::Lagged(_)) => Event::Lagged,
928                   Ok(event) => event.into(),
929               };
930               if sender.send(event).await.is_err() {
931                   break;
932               }
933           }
934           _ = sender.closed() => break,
935        }
936    }
937}
938
939/// A stream of commands for a gossip subscription.
940type BoxedCommandReceiver =
941    n0_future::stream::Boxed<Result<Command, irpc::channel::mpsc::RecvError>>;
942
943#[derive(derive_more::Debug)]
944struct TopicCommandStream {
945    topic_id: TopicId,
946    #[debug("CommandStream")]
947    stream: BoxedCommandReceiver,
948    closed: bool,
949}
950
951impl TopicCommandStream {
952    fn new(topic_id: TopicId, stream: BoxedCommandReceiver) -> Self {
953        Self {
954            topic_id,
955            stream,
956            closed: false,
957        }
958    }
959}
960
961impl Stream for TopicCommandStream {
962    type Item = (TopicId, Option<Command>);
963    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
964        if self.closed {
965            return Poll::Ready(None);
966        }
967        match Pin::new(&mut self.stream).poll_next(cx) {
968            Poll::Ready(Some(Ok(item))) => Poll::Ready(Some((self.topic_id, Some(item)))),
969            Poll::Ready(None) | Poll::Ready(Some(Err(_))) => {
970                self.closed = true;
971                Poll::Ready(Some((self.topic_id, None)))
972            }
973            Poll::Pending => Poll::Pending,
974        }
975    }
976}
977
978#[derive(Debug)]
979struct Dialer {
980    endpoint: Endpoint,
981    pending: JoinSet<(
982        EndpointId,
983        Option<Result<Connection, iroh::endpoint::ConnectError>>,
984    )>,
985    pending_dials: HashMap<EndpointId, CancellationToken>,
986}
987
988impl Dialer {
989    /// Create a new dialer for a [`Endpoint`]
990    fn new(endpoint: Endpoint) -> Self {
991        Self {
992            endpoint,
993            pending: Default::default(),
994            pending_dials: Default::default(),
995        }
996    }
997
998    /// Starts to dial a endpoint by [`EndpointId`].
999    fn queue_dial(&mut self, endpoint_id: EndpointId, alpn: Bytes) {
1000        if self.is_pending(endpoint_id) {
1001            return;
1002        }
1003        let cancel = CancellationToken::new();
1004        self.pending_dials.insert(endpoint_id, cancel.clone());
1005        let endpoint = self.endpoint.clone();
1006        self.pending.spawn(
1007            async move {
1008                let res = tokio::select! {
1009                    biased;
1010                    _ = cancel.cancelled() => None,
1011                    res = endpoint.connect(endpoint_id, &alpn) => Some(res),
1012                };
1013                (endpoint_id, res)
1014            }
1015            .instrument(tracing::Span::current()),
1016        );
1017    }
1018
1019    /// Checks if a endpoint is currently being dialed.
1020    fn is_pending(&self, endpoint: EndpointId) -> bool {
1021        self.pending_dials.contains_key(&endpoint)
1022    }
1023
1024    /// Waits for the next dial operation to complete.
1025    /// `None` means disconnected
1026    async fn next_conn(
1027        &mut self,
1028    ) -> (
1029        EndpointId,
1030        Option<Result<Connection, iroh::endpoint::ConnectError>>,
1031    ) {
1032        match self.pending_dials.is_empty() {
1033            false => {
1034                let (endpoint_id, res) = loop {
1035                    match self.pending.join_next().await {
1036                        Some(Ok((endpoint_id, res))) => {
1037                            self.pending_dials.remove(&endpoint_id);
1038                            break (endpoint_id, res);
1039                        }
1040                        Some(Err(e)) => {
1041                            error!("next conn error: {:?}", e);
1042                        }
1043                        None => {
1044                            error!("no more pending conns available");
1045                            std::future::pending().await
1046                        }
1047                    }
1048                };
1049
1050                (endpoint_id, res)
1051            }
1052            true => std::future::pending().await,
1053        }
1054    }
1055}
1056
1057#[cfg(test)]
1058pub(crate) mod test {
1059    use std::{future::Future, time::Duration};
1060
1061    use bytes::Bytes;
1062    use futures_concurrency::future::TryJoin;
1063    use iroh::{
1064        discovery::static_provider::StaticProvider, endpoint::BindError, protocol::Router,
1065        RelayMap, RelayMode, SecretKey,
1066    };
1067    use n0_error::{AnyError, Result, StdResultExt};
1068    use n0_tracing_test::traced_test;
1069    use rand::{CryptoRng, Rng};
1070    use tokio::{spawn, time::timeout};
1071    use tokio_util::sync::CancellationToken;
1072    use tracing::{info, instrument};
1073
1074    use super::*;
1075    use crate::api::{ApiError, GossipReceiver, GossipSender};
1076
1077    struct ManualActorLoop {
1078        actor: Actor,
1079        step: usize,
1080    }
1081
1082    impl std::ops::Deref for ManualActorLoop {
1083        type Target = Actor;
1084
1085        fn deref(&self) -> &Self::Target {
1086            &self.actor
1087        }
1088    }
1089
1090    impl std::ops::DerefMut for ManualActorLoop {
1091        fn deref_mut(&mut self) -> &mut Self::Target {
1092            &mut self.actor
1093        }
1094    }
1095
1096    type EndpointHandle = tokio::task::JoinHandle<Result<()>>;
1097
1098    impl ManualActorLoop {
1099        #[instrument(skip_all, fields(me = %actor.endpoint.id().fmt_short()))]
1100        async fn new(mut actor: Actor) -> Self {
1101            let _ = actor.setup().await;
1102            Self { actor, step: 0 }
1103        }
1104
1105        #[instrument(skip_all, fields(me = %self.endpoint.id().fmt_short()))]
1106        async fn step(&mut self) -> bool {
1107            let ManualActorLoop { actor, step } = self;
1108            *step += 1;
1109            // ignore updates that change our published address. This gives us better control over
1110            // events since the endpoint it no longer emitting changes
1111            let addr_update_stream = &mut futures_lite::stream::pending();
1112            actor.event_loop(addr_update_stream, *step).await
1113        }
1114
1115        async fn steps(&mut self, n: usize) {
1116            for _ in 0..n {
1117                self.step().await;
1118            }
1119        }
1120
1121        async fn finish(mut self) {
1122            while self.step().await {}
1123        }
1124    }
1125
1126    impl Gossip {
1127        /// Creates a testing gossip instance and its actor without spawning it.
1128        ///
1129        /// This creates the endpoint and spawns the endpoint loop as well. The handle for the
1130        /// endpoing task is returned along the gossip instance and actor. Since the actor is not
1131        /// actually spawned as [`Builder::spawn`] would, the gossip instance will have a
1132        /// handle to a dummy task instead.
1133        async fn t_new_with_actor(
1134            rng: &mut rand_chacha::ChaCha12Rng,
1135            config: proto::Config,
1136            relay_map: RelayMap,
1137            cancel: &CancellationToken,
1138        ) -> Result<(Self, Actor, EndpointHandle), BindError> {
1139            let endpoint = create_endpoint(rng, relay_map, None).await?;
1140            let metrics = Arc::new(Metrics::default());
1141            let discovery = GossipDiscovery::default();
1142            endpoint.discovery().add(discovery.clone());
1143
1144            let (actor, to_actor_tx, conn_tx) =
1145                Actor::new(endpoint, config, metrics.clone(), None, discovery);
1146            let max_message_size = actor.state.max_message_size();
1147
1148            let _actor_handle =
1149                AbortOnDropHandle::new(task::spawn(futures_lite::future::pending()));
1150            let gossip = Self {
1151                inner: Inner {
1152                    api: GossipApi::local(to_actor_tx),
1153                    local_tx: conn_tx,
1154                    _actor_handle,
1155                    max_message_size,
1156                    metrics,
1157                }
1158                .into(),
1159            };
1160
1161            let endpoint_task = task::spawn(endpoint_loop(
1162                actor.endpoint.clone(),
1163                gossip.clone(),
1164                cancel.child_token(),
1165            ));
1166
1167            Ok((gossip, actor, endpoint_task))
1168        }
1169
1170        /// Crates a new testing gossip instance with the normal actor loop.
1171        async fn t_new(
1172            rng: &mut rand_chacha::ChaCha12Rng,
1173            config: proto::Config,
1174            relay_map: RelayMap,
1175            cancel: &CancellationToken,
1176        ) -> Result<(Self, Endpoint, EndpointHandle, impl Drop + use<>), BindError> {
1177            let (g, actor, ep_handle) =
1178                Gossip::t_new_with_actor(rng, config, relay_map, cancel).await?;
1179            let ep = actor.endpoint.clone();
1180            let me = ep.id().fmt_short();
1181            let actor_handle =
1182                task::spawn(actor.run().instrument(tracing::error_span!("gossip", %me)));
1183            Ok((g, ep, ep_handle, AbortOnDropHandle::new(actor_handle)))
1184        }
1185    }
1186
1187    pub(crate) async fn create_endpoint(
1188        rng: &mut rand_chacha::ChaCha12Rng,
1189        relay_map: RelayMap,
1190        static_provider: Option<StaticProvider>,
1191    ) -> Result<Endpoint, BindError> {
1192        let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1193            .secret_key(SecretKey::generate(rng))
1194            .alpns(vec![GOSSIP_ALPN.to_vec()])
1195            .insecure_skip_relay_cert_verify(true)
1196            .bind()
1197            .await?;
1198
1199        if let Some(static_provider) = static_provider {
1200            ep.discovery().add(static_provider);
1201        }
1202        ep.online().await;
1203        Ok(ep)
1204    }
1205
1206    async fn endpoint_loop(
1207        endpoint: Endpoint,
1208        gossip: Gossip,
1209        cancel: CancellationToken,
1210    ) -> Result<()> {
1211        loop {
1212            tokio::select! {
1213                biased;
1214                _ = cancel.cancelled() => break,
1215                incoming = endpoint.accept() => match incoming {
1216                    None => break,
1217                    Some(incoming) => {
1218                        let connecting = match incoming.accept() {
1219                            Ok(connecting) => connecting,
1220                            Err(err) => {
1221                                warn!("incoming connection failed: {err:#}");
1222                                // we can carry on in these cases:
1223                                // this can be caused by retransmitted datagrams
1224                                continue;
1225                            }
1226                        };
1227                        let connection = connecting
1228                            .await
1229                            .std_context("await incoming connection")?;
1230                        gossip.handle_connection(connection).await?
1231                    }
1232                }
1233            }
1234        }
1235        Ok(())
1236    }
1237
1238    #[tokio::test]
1239    #[traced_test]
1240    async fn gossip_net_smoke() {
1241        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1242        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1243
1244        let static_provider = StaticProvider::new();
1245
1246        let ep1 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1247            .await
1248            .unwrap();
1249        let ep2 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1250            .await
1251            .unwrap();
1252        let ep3 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1253            .await
1254            .unwrap();
1255
1256        let go1 = Gossip::builder().spawn(ep1.clone());
1257        let go2 = Gossip::builder().spawn(ep2.clone());
1258        let go3 = Gossip::builder().spawn(ep3.clone());
1259        debug!("peer1 {:?}", ep1.id());
1260        debug!("peer2 {:?}", ep2.id());
1261        debug!("peer3 {:?}", ep3.id());
1262        let pi1 = ep1.id();
1263        let pi2 = ep2.id();
1264
1265        let cancel = CancellationToken::new();
1266        let tasks = [
1267            spawn(endpoint_loop(ep1.clone(), go1.clone(), cancel.clone())),
1268            spawn(endpoint_loop(ep2.clone(), go2.clone(), cancel.clone())),
1269            spawn(endpoint_loop(ep3.clone(), go3.clone(), cancel.clone())),
1270        ];
1271
1272        debug!("----- adding peers  ----- ");
1273        let topic: TopicId = blake3::hash(b"foobar").into();
1274
1275        let addr1 = EndpointAddr::new(pi1).with_relay_url(relay_url.clone());
1276        let addr2 = EndpointAddr::new(pi2).with_relay_url(relay_url);
1277        static_provider.add_endpoint_info(addr1.clone());
1278        static_provider.add_endpoint_info(addr2.clone());
1279
1280        debug!("----- joining  ----- ");
1281        // join the topics and wait for the connection to succeed
1282        let [sub1, mut sub2, mut sub3] = [
1283            go1.subscribe_and_join(topic, vec![]),
1284            go2.subscribe_and_join(topic, vec![pi1]),
1285            go3.subscribe_and_join(topic, vec![pi2]),
1286        ]
1287        .try_join()
1288        .await
1289        .unwrap();
1290
1291        let (sink1, _stream1) = sub1.split();
1292
1293        let len = 2;
1294
1295        // publish messages on endpoint1
1296        let pub1 = spawn(async move {
1297            for i in 0..len {
1298                let message = format!("hi{i}");
1299                info!("go1 broadcast: {message:?}");
1300                sink1.broadcast(message.into_bytes().into()).await.unwrap();
1301                tokio::time::sleep(Duration::from_micros(1)).await;
1302            }
1303        });
1304
1305        // wait for messages on endpoint2
1306        let sub2 = spawn(async move {
1307            let mut recv = vec![];
1308            loop {
1309                let ev = sub2.next().await.unwrap().unwrap();
1310                info!("go2 event: {ev:?}");
1311                if let Event::Received(msg) = ev {
1312                    recv.push(msg.content);
1313                }
1314                if recv.len() == len {
1315                    return recv;
1316                }
1317            }
1318        });
1319
1320        // wait for messages on endpoint3
1321        let sub3 = spawn(async move {
1322            let mut recv = vec![];
1323            loop {
1324                let ev = sub3.next().await.unwrap().unwrap();
1325                info!("go3 event: {ev:?}");
1326                if let Event::Received(msg) = ev {
1327                    recv.push(msg.content);
1328                }
1329                if recv.len() == len {
1330                    return recv;
1331                }
1332            }
1333        });
1334
1335        timeout(Duration::from_secs(10), pub1)
1336            .await
1337            .unwrap()
1338            .unwrap();
1339        let recv2 = timeout(Duration::from_secs(10), sub2)
1340            .await
1341            .unwrap()
1342            .unwrap();
1343        let recv3 = timeout(Duration::from_secs(10), sub3)
1344            .await
1345            .unwrap()
1346            .unwrap();
1347
1348        let expected: Vec<Bytes> = (0..len)
1349            .map(|i| Bytes::from(format!("hi{i}").into_bytes()))
1350            .collect();
1351        assert_eq!(recv2, expected);
1352        assert_eq!(recv3, expected);
1353
1354        cancel.cancel();
1355        for t in tasks {
1356            timeout(Duration::from_secs(10), t)
1357                .await
1358                .unwrap()
1359                .unwrap()
1360                .unwrap();
1361        }
1362    }
1363
1364    /// Test that when a gossip topic is no longer needed it's actually unsubscribed.
1365    ///
1366    /// This test will:
1367    /// - Create two endpoints, the first using manual event loop.
1368    /// - Subscribe both endpoints to the same topic. The first endpoint will subscribe twice and connect
1369    ///   to the second endpoint. The second endpoint will subscribe without bootstrap.
1370    /// - Ensure that the first endpoint removes the subscription iff all topic handles have been
1371    ///   dropped
1372    // NOTE: this is a regression test.
1373    #[tokio::test]
1374    #[traced_test]
1375    async fn subscription_cleanup() -> Result {
1376        let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1377        let ct = CancellationToken::new();
1378        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1379
1380        // create the first endpoint with a manual actor loop
1381        let (go1, actor, ep1_handle) =
1382            Gossip::t_new_with_actor(rng, Default::default(), relay_map.clone(), &ct).await?;
1383        let mut actor = ManualActorLoop::new(actor).await;
1384
1385        // create the second endpoint with the usual actor loop
1386        let (go2, ep2, ep2_handle, _test_actor_handle) =
1387            Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1388
1389        let endpoint_id1 = actor.endpoint.id();
1390        let endpoint_id2 = ep2.id();
1391        tracing::info!(
1392            endpoint_1 = %endpoint_id1.fmt_short(),
1393            endpoint_2 = %endpoint_id2.fmt_short(),
1394            "endpoints ready"
1395        );
1396
1397        let topic: TopicId = blake3::hash(b"subscription_cleanup").into();
1398        tracing::info!(%topic, "joining");
1399
1400        // create the tasks for each gossip instance:
1401        // - second endpoint subscribes once without bootstrap and listens to events
1402        // - first endpoint subscribes twice with the second endpoint as bootstrap. This is done on command
1403        //   from the main task (this)
1404
1405        // second endpoint
1406        let ct2 = ct.clone();
1407        let go2_task = async move {
1408            let (_pub_tx, mut sub_rx) = go2.subscribe_and_join(topic, vec![]).await?.split();
1409
1410            let subscribe_fut = async {
1411                while let Some(ev) = sub_rx.try_next().await? {
1412                    match ev {
1413                        Event::Lagged => tracing::debug!("missed some messages :("),
1414                        Event::Received(_) => unreachable!("test does not send messages"),
1415                        other => tracing::debug!(?other, "gs event"),
1416                    }
1417                }
1418
1419                tracing::debug!("subscribe stream ended");
1420                Ok::<_, AnyError>(())
1421            };
1422
1423            tokio::select! {
1424                _ = ct2.cancelled() => Ok(()),
1425                res = subscribe_fut => res,
1426            }
1427        }
1428        .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1429        let go2_handle = task::spawn(go2_task);
1430
1431        // first endpoint
1432        let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1433        let static_provider = StaticProvider::new();
1434        static_provider.add_endpoint_info(addr2);
1435        actor.endpoint.discovery().add(static_provider);
1436        // we use a channel to signal advancing steps to the task
1437        let (tx, mut rx) = mpsc::channel::<()>(1);
1438        let ct1 = ct.clone();
1439        let go1_task = async move {
1440            // first subscribe is done immediately
1441            tracing::info!("subscribing the first time");
1442            let sub_1a = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1443
1444            // wait for signal to subscribe a second time
1445            rx.recv().await.expect("signal for second subscribe");
1446            tracing::info!("subscribing a second time");
1447            let sub_1b = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1448            drop(sub_1a);
1449
1450            // wait for signal to drop the second handle as well
1451            rx.recv().await.expect("signal for second subscribe");
1452            tracing::info!("dropping all handles");
1453            drop(sub_1b);
1454
1455            // wait for cancellation
1456            ct1.cancelled().await;
1457            drop(go1);
1458
1459            Ok::<_, AnyError>(())
1460        }
1461        .instrument(tracing::debug_span!("endpoint_1", %endpoint_id1));
1462        let go1_handle = task::spawn(go1_task);
1463
1464        // advance and check that the topic is now subscribed
1465        actor.steps(3).await; // handle our subscribe;
1466                              // get peer connection;
1467                              // receive the other peer's information for a NeighborUp
1468        let state = actor.topics.get(&topic).expect("get registered topic");
1469        assert!(state.joined());
1470
1471        // signal the second subscribe, we should remain subscribed
1472        tx.send(())
1473            .await
1474            .std_context("signal additional subscribe")?;
1475        actor.steps(3).await; // subscribe; first receiver gone; first sender gone
1476        let state = actor.topics.get(&topic).expect("get registered topic");
1477        assert!(state.joined());
1478
1479        // signal to drop the second handle, the topic should no longer be subscribed
1480        tx.send(()).await.std_context("signal drop handles")?;
1481        actor.steps(2).await; // second receiver gone; second sender gone
1482        assert!(!actor.topics.contains_key(&topic));
1483
1484        // cleanup and ensure everything went as expected
1485        ct.cancel();
1486        let wait = Duration::from_secs(2);
1487        timeout(wait, ep1_handle)
1488            .await
1489            .std_context("wait endpoint1 task")?
1490            .std_context("join endpoint1 task")??;
1491        timeout(wait, ep2_handle)
1492            .await
1493            .std_context("wait endpoint2 task")?
1494            .std_context("join endpoint2 task")??;
1495        timeout(wait, go1_handle)
1496            .await
1497            .std_context("wait gossip1 task")?
1498            .std_context("join gossip1 task")??;
1499        timeout(wait, go2_handle)
1500            .await
1501            .std_context("wait gossip2 task")?
1502            .std_context("join gossip2 task")??;
1503        timeout(wait, actor.finish())
1504            .await
1505            .std_context("wait actor finish")?;
1506
1507        Ok(())
1508    }
1509
1510    /// Test that endpoints can reconnect to each other.
1511    ///
1512    /// This test will create two endpoints subscribed to the same topic. The second endpoint will
1513    /// unsubscribe and then resubscribe and connection between the endpoints should succeed both
1514    /// times.
1515    // NOTE: This is a regression test
1516    #[tokio::test]
1517    #[traced_test]
1518    async fn can_reconnect() -> Result {
1519        let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1520        let ct = CancellationToken::new();
1521        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1522
1523        let (go1, ep1, ep1_handle, _test_actor_handle1) =
1524            Gossip::t_new(rng, Default::default(), relay_map.clone(), &ct).await?;
1525
1526        let (go2, ep2, ep2_handle, _test_actor_handle2) =
1527            Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1528
1529        let endpoint_id1 = ep1.id();
1530        let endpoint_id2 = ep2.id();
1531        tracing::info!(
1532            endpoint_1 = %endpoint_id1.fmt_short(),
1533            endpoint_2 = %endpoint_id2.fmt_short(),
1534            "endpoints ready"
1535        );
1536
1537        let topic: TopicId = blake3::hash(b"can_reconnect").into();
1538        tracing::info!(%topic, "joining");
1539
1540        let ct2 = ct.child_token();
1541        // channel used to signal the second gossip instance to advance the test
1542        let (tx, mut rx) = mpsc::channel::<()>(1);
1543        let addr1 = EndpointAddr::new(endpoint_id1).with_relay_url(relay_url.clone());
1544        let static_provider = StaticProvider::new();
1545        static_provider.add_endpoint_info(addr1);
1546        ep2.discovery().add(static_provider.clone());
1547        let go2_task = async move {
1548            let mut sub = go2.subscribe(topic, Vec::new()).await?;
1549            sub.joined().await?;
1550
1551            rx.recv().await.expect("signal to unsubscribe");
1552            tracing::info!("unsubscribing");
1553            drop(sub);
1554
1555            rx.recv().await.expect("signal to subscribe again");
1556            tracing::info!("resubscribing");
1557            let mut sub = go2.subscribe(topic, vec![endpoint_id1]).await?;
1558
1559            sub.joined().await?;
1560            tracing::info!("subscription successful!");
1561
1562            ct2.cancelled().await;
1563
1564            Ok::<_, ApiError>(())
1565        }
1566        .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1567        let go2_handle = task::spawn(go2_task);
1568
1569        let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1570        static_provider.add_endpoint_info(addr2);
1571        ep1.discovery().add(static_provider);
1572
1573        let mut sub = go1.subscribe(topic, vec![endpoint_id2]).await?;
1574        // wait for subscribed notification
1575        sub.joined().await?;
1576
1577        // signal endpoint_2 to unsubscribe
1578        tx.send(()).await.std_context("signal unsubscribe")?;
1579
1580        // we should receive a Neighbor down event
1581        let conn_timeout = Duration::from_millis(500);
1582        let ev = timeout(conn_timeout, sub.try_next())
1583            .await
1584            .std_context("wait neighbor down")??;
1585        assert_eq!(ev, Some(Event::NeighborDown(endpoint_id2)));
1586        tracing::info!("endpoint 2 left");
1587
1588        // signal endpoint_2 to subscribe again
1589        tx.send(()).await.std_context("signal resubscribe")?;
1590
1591        let conn_timeout = Duration::from_millis(500);
1592        let ev = timeout(conn_timeout, sub.try_next())
1593            .await
1594            .std_context("wait neighbor up")??;
1595        assert_eq!(ev, Some(Event::NeighborUp(endpoint_id2)));
1596        tracing::info!("endpoint 2 rejoined!");
1597
1598        // cleanup and ensure everything went as expected
1599        ct.cancel();
1600        let wait = Duration::from_secs(2);
1601        timeout(wait, ep1_handle)
1602            .await
1603            .std_context("wait endpoint1 task")?
1604            .std_context("join endpoint1 task")??;
1605        timeout(wait, ep2_handle)
1606            .await
1607            .std_context("wait endpoint2 task")?
1608            .std_context("join endpoint2 task")??;
1609        timeout(wait, go2_handle)
1610            .await
1611            .std_context("wait gossip2 task")?
1612            .std_context("join gossip2 task")??;
1613
1614        Result::Ok(())
1615    }
1616
1617    #[tokio::test]
1618    #[traced_test]
1619    async fn can_die_and_reconnect() -> Result {
1620        /// Runs a future in a separate runtime on a separate thread, cancelling everything
1621        /// abruptly once `cancel` is invoked.
1622        fn run_in_thread<T: Send + 'static>(
1623            cancel: CancellationToken,
1624            fut: impl std::future::Future<Output = T> + Send + 'static,
1625        ) -> std::thread::JoinHandle<Option<T>> {
1626            std::thread::spawn(move || {
1627                let rt = tokio::runtime::Builder::new_current_thread()
1628                    .enable_all()
1629                    .build()
1630                    .unwrap();
1631                rt.block_on(async move { cancel.run_until_cancelled(fut).await })
1632            })
1633        }
1634
1635        /// Spawns a new endpoint and gossip instance.
1636        async fn spawn_gossip(
1637            secret_key: SecretKey,
1638            relay_map: RelayMap,
1639        ) -> Result<(Router, Gossip), BindError> {
1640            let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1641                .secret_key(secret_key)
1642                .insecure_skip_relay_cert_verify(true)
1643                .bind()
1644                .await?;
1645            let gossip = Gossip::builder().spawn(ep.clone());
1646            let router = Router::builder(ep)
1647                .accept(GOSSIP_ALPN, gossip.clone())
1648                .spawn();
1649            Ok((router, gossip))
1650        }
1651
1652        /// Spawns a gossip endpoint, and broadcasts a single message, then sleep until cancelled externally.
1653        async fn broadcast_once(
1654            secret_key: SecretKey,
1655            relay_map: RelayMap,
1656            bootstrap_addr: EndpointAddr,
1657            topic_id: TopicId,
1658            message: String,
1659        ) -> Result {
1660            let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1661            info!(endpoint_id = %router.endpoint().id().fmt_short(), "broadcast endpoint spawned");
1662            let bootstrap = vec![bootstrap_addr.id];
1663            let static_provider = StaticProvider::new();
1664            static_provider.add_endpoint_info(bootstrap_addr);
1665            router.endpoint().discovery().add(static_provider);
1666            let mut topic = gossip.subscribe_and_join(topic_id, bootstrap).await?;
1667            topic.broadcast(message.as_bytes().to_vec().into()).await?;
1668            std::future::pending::<()>().await;
1669            Ok(())
1670        }
1671
1672        let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1673        let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1674        let topic_id = TopicId::from_bytes(rng.random());
1675
1676        // spawn a gossip endpoint, send the endpoint's address on addr_tx,
1677        // then wait to receive `count` messages, and terminate.
1678        let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
1679        let (msgs_recv_tx, mut msgs_recv_rx) = tokio::sync::mpsc::channel(3);
1680        let recv_task = tokio::task::spawn({
1681            let relay_map = relay_map.clone();
1682            let secret_key = SecretKey::generate(&mut rng);
1683            async move {
1684                let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1685                // wait for the relay to be set. iroh currently has issues when trying
1686                // to immediately reconnect with changed direct addresses, but when the
1687                // relay path is available it works.
1688                // See https://github.com/n0-computer/iroh/pull/3372
1689                router.endpoint().online().await;
1690                let addr = router.endpoint().addr();
1691                info!(endpoint_id = %addr.id.fmt_short(), "recv endpoint spawned");
1692                addr_tx.send(addr).unwrap();
1693                let mut topic = gossip.subscribe_and_join(topic_id, vec![]).await?;
1694                while let Some(event) = topic.try_next().await.unwrap() {
1695                    if let Event::Received(message) = event {
1696                        let message = std::str::from_utf8(&message.content)
1697                            .std_context("decode broadcast message")?
1698                            .to_string();
1699                        msgs_recv_tx
1700                            .send(message)
1701                            .await
1702                            .std_context("forward received message")?;
1703                    }
1704                }
1705                Ok::<_, AnyError>(())
1706            }
1707        });
1708
1709        let endpoint0_addr = addr_rx.await.std_context("receive endpoint address")?;
1710        let max_wait = Duration::from_secs(5);
1711
1712        // spawn a endpoint, send a message, and then abruptly terminate the endpoint ungracefully
1713        // after the message was received on our receiver endpoint.
1714        let cancel = CancellationToken::new();
1715        let secret = SecretKey::generate(&mut rng);
1716        let join_handle_1 = run_in_thread(
1717            cancel.clone(),
1718            broadcast_once(
1719                secret.clone(),
1720                relay_map.clone(),
1721                endpoint0_addr.clone(),
1722                topic_id,
1723                "msg1".to_string(),
1724            ),
1725        );
1726        // assert that we received the message on the receiver endpoint.
1727        let msg = timeout(max_wait, msgs_recv_rx.recv())
1728            .await
1729            .std_context("wait for first broadcast")?
1730            .std_context("receiver dropped channel")?;
1731        assert_eq!(&msg, "msg1");
1732        info!("kill broadcast endpoint");
1733        cancel.cancel();
1734
1735        // spawns the endpoint again with the same endpoint id, and send another message
1736        let cancel = CancellationToken::new();
1737        let join_handle_2 = run_in_thread(
1738            cancel.clone(),
1739            broadcast_once(
1740                secret.clone(),
1741                relay_map.clone(),
1742                endpoint0_addr.clone(),
1743                topic_id,
1744                "msg2".to_string(),
1745            ),
1746        );
1747        // assert that we received the message on the receiver endpoint.
1748        // this means that the reconnect with the same endpoint id worked.
1749        let msg = timeout(max_wait, msgs_recv_rx.recv())
1750            .await
1751            .std_context("wait for second broadcast")?
1752            .std_context("receiver dropped channel")?;
1753        assert_eq!(&msg, "msg2");
1754        info!("kill broadcast endpoint");
1755        cancel.cancel();
1756
1757        info!("kill recv endpoint");
1758        recv_task.abort();
1759        assert!(join_handle_1.join().unwrap().is_none());
1760        assert!(join_handle_2.join().unwrap().is_none());
1761
1762        Ok(())
1763    }
1764
1765    #[tokio::test]
1766    #[traced_test]
1767    async fn gossip_change_alpn() -> n0_error::Result<()> {
1768        let alpn = b"my-gossip-alpn";
1769        let topic_id = TopicId::from([0u8; 32]);
1770
1771        let ep1 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1772        let ep2 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1773        let gossip1 = Gossip::builder().alpn(alpn).spawn(ep1.clone());
1774        let gossip2 = Gossip::builder().alpn(alpn).spawn(ep2.clone());
1775        let router1 = Router::builder(ep1).accept(alpn, gossip1.clone()).spawn();
1776        let router2 = Router::builder(ep2).accept(alpn, gossip2.clone()).spawn();
1777
1778        let addr1 = router1.endpoint().addr();
1779        let id1 = addr1.id;
1780        let static_provider = StaticProvider::new();
1781        static_provider.add_endpoint_info(addr1);
1782        router2.endpoint().discovery().add(static_provider);
1783
1784        let mut topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1785        let mut topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1786
1787        timeout(Duration::from_secs(3), topic1.joined())
1788            .await
1789            .std_context("wait topic1 join")??;
1790        timeout(Duration::from_secs(3), topic2.joined())
1791            .await
1792            .std_context("wait topic2 join")??;
1793        router1.shutdown().await.std_context("shutdown router1")?;
1794        router2.shutdown().await.std_context("shutdown router2")?;
1795        Ok(())
1796    }
1797
1798    #[tokio::test]
1799    #[traced_test]
1800    async fn gossip_rely_on_gossip_discovery() -> n0_error::Result<()> {
1801        let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1802
1803        async fn spawn(
1804            rng: &mut impl CryptoRng,
1805        ) -> n0_error::Result<(EndpointId, Router, Gossip, GossipSender, GossipReceiver)> {
1806            let topic_id = TopicId::from([0u8; 32]);
1807            let ep = Endpoint::empty_builder(RelayMode::Disabled)
1808                .secret_key(SecretKey::generate(rng))
1809                .bind()
1810                .await?;
1811            let endpoint_id = ep.id();
1812            let gossip = Gossip::builder().spawn(ep.clone());
1813            let router = Router::builder(ep)
1814                .accept(GOSSIP_ALPN, gossip.clone())
1815                .spawn();
1816            let topic = gossip.subscribe(topic_id, vec![]).await?;
1817            let (sender, receiver) = topic.split();
1818            Ok((endpoint_id, router, gossip, sender, receiver))
1819        }
1820
1821        // spawn 3 endpoints without relay or discovery
1822        let (n1, r1, _g1, _tx1, mut rx1) = spawn(rng).await?;
1823        let (n2, r2, _g2, tx2, mut rx2) = spawn(rng).await?;
1824        let (n3, r3, _g3, tx3, mut rx3) = spawn(rng).await?;
1825
1826        println!("endpoints {:?}", [n1, n2, n3]);
1827
1828        // create a static discovery that has only endpoint 1 addr info set
1829        let addr1 = r1.endpoint().addr();
1830        let disco = StaticProvider::new();
1831        disco.add_endpoint_info(addr1);
1832
1833        // add addr info of endpoint1 to endpoint2 and join endpoint1
1834        r2.endpoint().discovery().add(disco.clone());
1835        tx2.join_peers(vec![n1]).await?;
1836
1837        // await join endpoint2 -> nodde1
1838        timeout(Duration::from_secs(3), rx1.joined())
1839            .await
1840            .std_context("wait rx1 join")??;
1841        timeout(Duration::from_secs(3), rx2.joined())
1842            .await
1843            .std_context("wait rx2 join")??;
1844
1845        // add addr info of endpoint1 to endpoint3 and join endpoint1
1846        r3.endpoint().discovery().add(disco.clone());
1847        tx3.join_peers(vec![n1]).await?;
1848
1849        // await join at endpoint3: n1 and n2
1850        // n2 only works because because we use gossip discovery!
1851        let ev = timeout(Duration::from_secs(3), rx3.next())
1852            .await
1853            .std_context("wait rx3 first neighbor")?;
1854        assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1855        let ev = timeout(Duration::from_secs(3), rx3.next())
1856            .await
1857            .std_context("wait rx3 second neighbor")?;
1858        assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1859
1860        assert_eq!(sorted(rx3.neighbors()), sorted([n1, n2]));
1861
1862        let ev = timeout(Duration::from_secs(3), rx2.next())
1863            .await
1864            .std_context("wait rx2 neighbor")?;
1865        assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1866
1867        let ev = timeout(Duration::from_secs(3), rx1.next())
1868            .await
1869            .std_context("wait rx1 neighbor")?;
1870        assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1871
1872        tokio::try_join!(r1.shutdown(), r2.shutdown(), r3.shutdown())
1873            .std_context("shutdown routers")?;
1874        Ok(())
1875    }
1876
1877    fn sorted<T: Ord>(input: impl IntoIterator<Item = T>) -> Vec<T> {
1878        let mut out: Vec<_> = input.into_iter().collect();
1879        out.sort();
1880        out
1881    }
1882
1883    /// Test that dropping sender doesn't close topic while receiver is still listening.
1884    ///
1885    /// This is a common footgun: users split a GossipTopic, drop the sender early,
1886    /// and expect the receiver to keep working. With the bug (using && in still_needed),
1887    /// the topic closes immediately when sender is dropped.
1888    #[tokio::test]
1889    #[traced_test]
1890    async fn topic_stays_alive_after_sender_drop() -> n0_error::Result<()> {
1891        let topic_id = TopicId::from([99u8; 32]);
1892
1893        let ep1 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1894        let ep2 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1895        let gossip1 = Gossip::builder().spawn(ep1.clone());
1896        let gossip2 = Gossip::builder().spawn(ep2.clone());
1897        let router1 = Router::builder(ep1)
1898            .accept(crate::ALPN, gossip1.clone())
1899            .spawn();
1900        let router2 = Router::builder(ep2)
1901            .accept(crate::ALPN, gossip2.clone())
1902            .spawn();
1903
1904        let addr1 = router1.endpoint().addr();
1905        let id1 = addr1.id;
1906        let static_provider = StaticProvider::new();
1907        static_provider.add_endpoint_info(addr1);
1908        router2.endpoint().discovery().add(static_provider);
1909
1910        let topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1911        let topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1912
1913        let (tx1, mut rx1) = topic1.split();
1914        let (tx2, mut rx2) = topic2.split();
1915
1916        // Wait for mesh to form
1917        timeout(Duration::from_secs(3), rx1.joined())
1918            .await
1919            .std_context("wait rx1 join")??;
1920        timeout(Duration::from_secs(3), rx2.joined())
1921            .await
1922            .std_context("wait rx2 join")??;
1923
1924        // Node 1 drops its sender - simulating the footgun where user drops sender early
1925        drop(tx1);
1926
1927        // Node 2 sends a message - receiver on node 1 should still get it
1928        tx2.broadcast(b"hello from node2".to_vec().into()).await?;
1929
1930        // Node 1's receiver should still work and receive the message
1931        let event = timeout(Duration::from_secs(3), rx1.next())
1932            .await
1933            .std_context("wait for message on rx1")?;
1934
1935        match event {
1936            Some(Ok(Event::Received(msg))) => {
1937                assert_eq!(&msg.content[..], b"hello from node2");
1938            }
1939            other => panic!("expected Received event, got {:?}", other),
1940        }
1941
1942        drop(tx2);
1943        drop(rx1);
1944        drop(rx2);
1945        router1.shutdown().await.std_context("shutdown router1")?;
1946        router2.shutdown().await.std_context("shutdown router2")?;
1947        Ok(())
1948    }
1949
1950    /// Test that peers can reconnect after one goes offline and comes back.
1951    ///
1952    /// This reproduces a scenario where:
1953    /// 1. Peer A starts with a fixed secret key
1954    /// 2. Peer B joins using A as bootstrap
1955    /// 3. Peer A goes offline (connection is closed out from B's perspective)
1956    /// 4. Peer A comes back online and attempts to rejoin using B as bootstrap
1957    /// 5. Connectivity is restored.
1958    #[tokio::test]
1959    #[traced_test]
1960    async fn peer_reconnect_after_offline() -> Result {
1961        let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(42);
1962        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1963        let topic_id = TopicId::from([42u8; 32]);
1964        let disco = StaticProvider::new();
1965
1966        let secret_key_a = SecretKey::generate(rng);
1967        let secret_key_b = SecretKey::generate(rng);
1968
1969        // Start peer A
1970        let ep_a = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1971            .secret_key(secret_key_a.clone())
1972            .insecure_skip_relay_cert_verify(true)
1973            .discovery(disco.clone())
1974            .bind()
1975            .await?;
1976        disco.set_endpoint_info(EndpointAddr::new(ep_a.id()).with_relay_url(relay_url.clone()));
1977        let mut gossip_a = Gossip::builder().spawn(ep_a.clone());
1978        let mut router_a = Router::builder(ep_a)
1979            .accept(GOSSIP_ALPN, gossip_a.clone())
1980            .spawn();
1981        router_a.endpoint().online().await;
1982        info!("Peer A started: {}", router_a.endpoint().id().fmt_short());
1983
1984        // Start peer B
1985        let ep_b = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1986            .secret_key(secret_key_b.clone())
1987            .insecure_skip_relay_cert_verify(true)
1988            .discovery(disco.clone())
1989            .bind()
1990            .await?;
1991        disco.set_endpoint_info(EndpointAddr::new(ep_b.id()).with_relay_url(relay_url.clone()));
1992        let gossip_b = Gossip::builder().spawn(ep_b.clone());
1993        let router_b = Router::builder(ep_b)
1994            .accept(GOSSIP_ALPN, gossip_b.clone())
1995            .spawn();
1996        router_b.endpoint().online().await;
1997        info!("Peer B started: {}", router_b.endpoint().id().fmt_short());
1998
1999        let id_b = router_b.endpoint().id();
2000        let id_a = router_a.endpoint().id();
2001
2002        // A subscribes to topic without bootstrap peers
2003        let topic_a = gossip_a.subscribe(topic_id, vec![]).await?;
2004        let (_tx_a, mut rx_a) = topic_a.split();
2005
2006        // B subscribes and joins using A as bootstrap
2007        let topic_b = gossip_b.subscribe(topic_id, vec![id_a]).await?;
2008        let (tx_b, mut rx_b) = topic_b.split();
2009
2010        // Wait for both to join the mesh
2011        timeout_2s(rx_a.joined()).await??;
2012        timeout_2s(rx_b.joined()).await??;
2013        info!("Both peers joined");
2014
2015        // Verify they can communicate
2016        tx_b.broadcast(b"hello from B".to_vec().into()).await?;
2017        match timeout_2s(rx_a.next()).await? {
2018            Some(Ok(Event::Received(msg))) => assert_eq!(&msg.content[..], b"hello from B"),
2019            other => panic!("expected Received event, got {:?}", other),
2020        }
2021        info!("Initial communication successful");
2022
2023        for i in 0..10 {
2024            async {
2025                info!("Shutting down Peer A to simulate going offline");
2026                // We shutdown the *endpoint* directly. This force-closes all connections
2027                // without gossip getting a chance to send explicit Disconnect messages.
2028                router_a.endpoint().close().await;
2029
2030                // Peer B sees connection being closed
2031                info!("Waiting for Peer B to detect Peer A going offline");
2032                match timeout_2s(rx_b.next()).await? {
2033                    Some(Ok(Event::NeighborDown(peer))) if peer == id_a => {}
2034                    other => panic!("Peer B received unexpected event: {other:?}"),
2035                }
2036
2037                // Peer A comes back online and rejoins using B as bootstrap
2038                info!("Restarting Peer A");
2039                let ep_a = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
2040                    .secret_key(secret_key_a.clone())
2041                    .discovery(disco.clone())
2042                    .insecure_skip_relay_cert_verify(true)
2043                    .bind()
2044                    .await?;
2045                gossip_a = Gossip::builder().spawn(ep_a.clone());
2046                router_a = Router::builder(ep_a)
2047                    .accept(GOSSIP_ALPN, gossip_a.clone())
2048                    .spawn();
2049                router_a.endpoint().online().await;
2050                info!("Peer A restarted: {}", router_a.endpoint().id().fmt_short());
2051
2052                // A rejoins using B as bootstrap
2053                let topic_a = gossip_a.subscribe(topic_id, vec![id_b]).await?;
2054                let (_tx_a, mut rx_a) = topic_a.split();
2055
2056                // Both peers should successfully reconnect
2057                info!("Waiting for Peer A to rejoin");
2058                timeout_2s(rx_a.joined()).await??;
2059                info!("Peer A successfully rejoined");
2060
2061                // Wait for B to see A as neighbor again
2062                match timeout_2s(rx_b.next()).await? {
2063                    Some(Ok(Event::NeighborUp(peer))) if peer == id_a => {}
2064                    other => panic!("Peer B received unexpected event: {other:?}"),
2065                }
2066
2067                // Verify they can communicate again
2068                tx_b.broadcast(format!("hello from B after reconnect {i}").into())
2069                    .await?;
2070                let event = match timeout_2s(rx_a.next()).await? {
2071                    Some(Ok(Event::Received(msg))) => msg,
2072                    other => panic!("Peer A received unexpected event {other:?}"),
2073                };
2074                assert_eq!(
2075                    &event.content[..],
2076                    format!("hello from B after reconnect {i}").as_bytes()
2077                );
2078                info!("Communication restored after reconnection");
2079                n0_error::Ok(())
2080            }
2081            .instrument(error_span!("round", %i))
2082            .await?;
2083        }
2084
2085        router_a
2086            .shutdown()
2087            .await
2088            .std_context("shutdown router_a2")?;
2089        router_b.shutdown().await.std_context("shutdown router_b")?;
2090
2091        Ok(())
2092    }
2093
2094    async fn timeout_2s<T>(fut: impl Future<Output = T>) -> Result<T> {
2095        tokio::time::timeout(Duration::from_secs(2), fut)
2096            .await
2097            .std_context("timeout")
2098    }
2099}