iroh_gossip/
net.rs

1//! Networking for the `iroh-gossip` protocol
2
3use std::{
4    collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
5    net::SocketAddr,
6    pin::Pin,
7    sync::Arc,
8    task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_concurrency::stream::{stream_group, StreamGroup};
13use futures_util::FutureExt as _;
14use iroh::{
15    endpoint::Connection,
16    protocol::{AcceptError, ProtocolHandler},
17    Endpoint, EndpointAddr, EndpointId, PublicKey, RelayUrl, Watcher,
18};
19use irpc::WithChannels;
20use n0_error::{e, stack_error};
21use n0_future::{
22    task::{self, AbortOnDropHandle, JoinSet},
23    time::Instant,
24    Stream, StreamExt as _,
25};
26use rand::{rngs::StdRng, SeedableRng};
27use serde::{Deserialize, Serialize};
28use tokio::sync::{broadcast, mpsc, oneshot};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, error_span, trace, warn, Instrument};
31
32use self::{
33    address_lookup::GossipAddressLookup,
34    util::{RecvLoop, SendLoop, Timers},
35};
36use crate::{
37    api::{self, Command, Event, GossipApi, RpcMessage},
38    metrics::Metrics,
39    proto::{self, HyparviewConfig, PeerData, PlumtreeConfig, Scope, TopicId},
40};
41
42mod address_lookup;
43mod util;
44
45/// ALPN protocol name
46pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/1";
47
48/// Channel capacity for the send queue (one per connection)
49const SEND_QUEUE_CAP: usize = 64;
50/// Channel capacity for the ToActor message queue (single)
51const TO_ACTOR_CAP: usize = 64;
52/// Channel capacity for the InEvent message queue (single)
53const IN_EVENT_CAP: usize = 1024;
54/// Channel capacity for broadcast subscriber event queue (one per topic)
55const TOPIC_EVENT_CAP: usize = 256;
56
57/// Events emitted from the gossip protocol
58pub type ProtoEvent = proto::Event<PublicKey>;
59/// Commands for the gossip protocol
60pub type ProtoCommand = proto::Command<PublicKey>;
61
62type InEvent = proto::InEvent<PublicKey>;
63type OutEvent = proto::OutEvent<PublicKey>;
64type Timer = proto::Timer<PublicKey>;
65type ProtoMessage = proto::Message<PublicKey>;
66
67/// Publish and subscribe on gossiping topics.
68///
69/// Each topic is a separate broadcast tree with separate memberships.
70///
71/// A topic has to be joined before you can publish or subscribe on the topic.
72/// To join the swarm for a topic, you have to know the [`PublicKey`] of at least one peer that also joined the topic.
73///
74/// Messages published on the swarm will be delivered to all peers that joined the swarm for that
75/// topic. You will also be relaying (gossiping) messages published by other peers.
76///
77/// With the default settings, the protocol will maintain up to 5 peer connections per topic.
78///
79/// Even though the [`Gossip`] is created from a [`Endpoint`], it does not accept connections
80/// itself. You should run an accept loop on the [`Endpoint`] yourself, check the ALPN protocol of incoming
81/// connections, and if the ALPN protocol equals [`GOSSIP_ALPN`], forward the connection to the
82/// gossip actor through [Self::handle_connection].
83///
84/// The gossip actor will, however, initiate new connections to other peers by itself.
85#[derive(Debug, Clone)]
86pub struct Gossip {
87    pub(crate) inner: Arc<Inner>,
88}
89
90impl std::ops::Deref for Gossip {
91    type Target = GossipApi;
92    fn deref(&self) -> &Self::Target {
93        &self.inner.api
94    }
95}
96
97#[derive(Debug)]
98enum LocalActorMessage {
99    HandleConnection(Connection),
100    Shutdown { reply: oneshot::Sender<()> },
101}
102
103#[allow(missing_docs)]
104#[stack_error(derive, add_meta)]
105#[non_exhaustive]
106pub enum Error {
107    ActorDropped {},
108}
109
110impl<T> From<mpsc::error::SendError<T>> for Error {
111    fn from(_value: mpsc::error::SendError<T>) -> Self {
112        e!(Error::ActorDropped)
113    }
114}
115impl From<oneshot::error::RecvError> for Error {
116    fn from(_value: oneshot::error::RecvError) -> Self {
117        e!(Error::ActorDropped)
118    }
119}
120
121#[derive(Debug)]
122pub(crate) struct Inner {
123    api: GossipApi,
124    local_tx: mpsc::Sender<LocalActorMessage>,
125    _actor_handle: AbortOnDropHandle<()>,
126    max_message_size: usize,
127    metrics: Arc<Metrics>,
128}
129
130impl ProtocolHandler for Gossip {
131    async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
132        self.handle_connection(connection)
133            .await
134            .map_err(AcceptError::from_err)?;
135        Ok(())
136    }
137
138    async fn shutdown(&self) {
139        if let Err(err) = self.shutdown().await {
140            warn!("error while shutting down gossip: {err:#}");
141        }
142    }
143}
144
145/// Builder to configure and construct [`Gossip`].
146#[derive(Debug, Clone)]
147pub struct Builder {
148    config: proto::Config,
149    alpn: Option<Bytes>,
150}
151
152impl Builder {
153    /// Sets the maximum message size in bytes.
154    /// By default this is `4096` bytes.
155    pub fn max_message_size(mut self, size: usize) -> Self {
156        self.config.max_message_size = size;
157        self
158    }
159
160    /// Set the membership configuration.
161    pub fn membership_config(mut self, config: HyparviewConfig) -> Self {
162        self.config.membership = config;
163        self
164    }
165
166    /// Set the broadcast configuration.
167    pub fn broadcast_config(mut self, config: PlumtreeConfig) -> Self {
168        self.config.broadcast = config;
169        self
170    }
171
172    /// Set the ALPN this gossip instance uses.
173    ///
174    /// It has to be the same for all peers in the network. If you set a custom ALPN,
175    /// you have to use the same ALPN when registering the [`Gossip`] in on a iroh
176    /// router with [`RouterBuilder::accept`].
177    ///
178    /// [`RouterBuilder::accept`]: iroh::protocol::RouterBuilder::accept
179    pub fn alpn(mut self, alpn: impl AsRef<[u8]>) -> Self {
180        self.alpn = Some(alpn.as_ref().to_vec().into());
181        self
182    }
183
184    /// Spawn a gossip actor and get a handle for it
185    pub fn spawn(self, endpoint: Endpoint) -> Gossip {
186        let metrics = Arc::new(Metrics::default());
187        let address_lookup = GossipAddressLookup::default();
188        endpoint.address_lookup().add(address_lookup.clone());
189        let (actor, rpc_tx, local_tx) = Actor::new(
190            endpoint,
191            self.config,
192            metrics.clone(),
193            self.alpn,
194            address_lookup,
195        );
196        let me = actor.endpoint.id().fmt_short();
197        let max_message_size = actor.state.max_message_size();
198
199        let actor_handle = task::spawn(actor.run().instrument(error_span!("gossip", %me)));
200
201        let api = GossipApi::local(rpc_tx);
202
203        Gossip {
204            inner: Inner {
205                api,
206                local_tx,
207                _actor_handle: AbortOnDropHandle::new(actor_handle),
208                max_message_size,
209                metrics,
210            }
211            .into(),
212        }
213    }
214}
215
216impl Gossip {
217    /// Creates a default `Builder`, with the endpoint set.
218    pub fn builder() -> Builder {
219        Builder {
220            config: Default::default(),
221            alpn: None,
222        }
223    }
224
225    /// Listen on a quinn endpoint for incoming RPC connections.
226    #[cfg(feature = "rpc")]
227    pub async fn listen(self, endpoint: quinn::Endpoint) {
228        self.inner.api.listen(endpoint).await
229    }
230
231    /// Get the maximum message size configured for this gossip actor.
232    pub fn max_message_size(&self) -> usize {
233        self.inner.max_message_size
234    }
235
236    /// Handle an incoming [`Connection`].
237    ///
238    /// Make sure to check the ALPN protocol yourself before passing the connection.
239    pub async fn handle_connection(&self, conn: Connection) -> Result<(), Error> {
240        self.inner
241            .local_tx
242            .send(LocalActorMessage::HandleConnection(conn))
243            .await?;
244        Ok(())
245    }
246
247    /// Shutdown the gossip instance.
248    ///
249    /// This leaves all topics, sending `Disconnect` messages to peers, and then
250    /// stops the gossip actor loop and drops all state and connections.
251    pub async fn shutdown(&self) -> Result<(), Error> {
252        let (reply, reply_rx) = oneshot::channel();
253        self.inner
254            .local_tx
255            .send(LocalActorMessage::Shutdown { reply })
256            .await?;
257        reply_rx.await?;
258        Ok(())
259    }
260
261    /// Returns the metrics tracked for this gossip instance.
262    pub fn metrics(&self) -> &Arc<Metrics> {
263        &self.inner.metrics
264    }
265}
266
267/// Actor that sends and handles messages between the connection and main state loops
268struct Actor {
269    alpn: Bytes,
270    /// Protocol state
271    state: proto::State<PublicKey, StdRng>,
272    /// The endpoint through which we dial peers
273    endpoint: Endpoint,
274    /// Dial machine to connect to peers
275    dialer: Dialer,
276    /// Input messages to the actor
277    rpc_rx: mpsc::Receiver<RpcMessage>,
278    local_rx: mpsc::Receiver<LocalActorMessage>,
279    /// Sender for the state input (cloned into the connection loops)
280    in_event_tx: mpsc::Sender<InEvent>,
281    /// Input events to the state (emitted from the connection loops)
282    in_event_rx: mpsc::Receiver<InEvent>,
283    /// Queued timers
284    timers: Timers<Timer>,
285    /// Map of topics to their state.
286    topics: HashMap<TopicId, TopicState>,
287    /// Map of peers to their state.
288    peers: HashMap<EndpointId, PeerState>,
289    /// Stream of commands from topic handles.
290    command_rx: stream_group::Keyed<TopicCommandStream>,
291    /// Internal queue of topic to close because all handles were dropped.
292    quit_queue: VecDeque<TopicId>,
293    /// Tasks for the connection loops, to keep track of panics.
294    connection_tasks: JoinSet<(EndpointId, Connection, Result<(), ConnectionLoopError>)>,
295    metrics: Arc<Metrics>,
296    topic_event_forwarders: JoinSet<TopicId>,
297    address_lookup: GossipAddressLookup,
298}
299
300impl Actor {
301    fn new(
302        endpoint: Endpoint,
303        config: proto::Config,
304        metrics: Arc<Metrics>,
305        alpn: Option<Bytes>,
306        address_lookup: GossipAddressLookup,
307    ) -> (
308        Self,
309        mpsc::Sender<RpcMessage>,
310        mpsc::Sender<LocalActorMessage>,
311    ) {
312        let peer_id = endpoint.id();
313        let dialer = Dialer::new(endpoint.clone());
314        let state = proto::State::new(
315            peer_id,
316            Default::default(),
317            config,
318            rand::rngs::StdRng::from_rng(&mut rand::rng()),
319        );
320        let (rpc_tx, rpc_rx) = mpsc::channel(TO_ACTOR_CAP);
321        let (local_tx, local_rx) = mpsc::channel(16);
322        let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
323
324        let actor = Actor {
325            alpn: alpn.unwrap_or_else(|| GOSSIP_ALPN.to_vec().into()),
326            endpoint,
327            state,
328            dialer,
329            rpc_rx,
330            in_event_rx,
331            in_event_tx,
332            timers: Timers::new(),
333            command_rx: StreamGroup::new().keyed(),
334            peers: Default::default(),
335            topics: Default::default(),
336            quit_queue: Default::default(),
337            connection_tasks: Default::default(),
338            metrics,
339            local_rx,
340            topic_event_forwarders: Default::default(),
341            address_lookup,
342        };
343
344        (actor, rpc_tx, local_tx)
345    }
346
347    pub async fn run(mut self) {
348        let mut addr_update_stream = self.setup().await;
349
350        let mut i = 0;
351        while self.event_loop(&mut addr_update_stream, i).await {
352            i += 1;
353        }
354    }
355
356    /// Performs the initial actor setup to run the [`Actor::event_loop`].
357    ///
358    /// This updates our current address and return it. It also returns the home relay stream and
359    /// direct addr stream.
360    async fn setup(&mut self) -> impl Stream<Item = EndpointAddr> + Send + Unpin + use<> {
361        let addr_update_stream = self.endpoint.watch_addr().stream();
362        let initial_addr = self.endpoint.addr();
363        self.handle_addr_update(initial_addr).await;
364        addr_update_stream
365    }
366
367    /// One event loop processing step.
368    ///
369    /// None is returned when no further processing should be performed.
370    async fn event_loop(
371        &mut self,
372        addr_updates: &mut (impl Stream<Item = EndpointAddr> + Send + Unpin),
373        i: usize,
374    ) -> bool {
375        self.metrics.actor_tick_main.inc();
376        tokio::select! {
377            biased;
378            conn = self.local_rx.recv() => {
379                match conn {
380                    Some(LocalActorMessage::Shutdown { reply }) => {
381                        debug!("received shutdown message, quit all topics");
382                        self.quit_queue.extend(self.topics.keys().copied());
383                        self.process_quit_queue().await;
384                        debug!("all topics quit, stop gossip actor");
385                        reply.send(()).ok();
386                        return false;
387                    },
388                    Some(LocalActorMessage::HandleConnection(conn)) => {
389                        self.handle_connection(conn.remote_id(), ConnOrigin::Accept, conn);
390                    }
391                    None => {
392                        debug!("all gossip handles dropped, stop gossip actor");
393                        return false;
394                    }
395                }
396            }
397            msg = self.rpc_rx.recv() => {
398                trace!(?i, "tick: to_actor_rx");
399                self.metrics.actor_tick_rx.inc();
400                match msg {
401                    Some(msg) => {
402                        self.handle_rpc_msg(msg, Instant::now()).await;
403                    }
404                    None => {
405                        debug!("all gossip handles dropped, stop gossip actor");
406                        return false;
407                    }
408                }
409            },
410            Some((key, (topic, command))) = self.command_rx.next(), if !self.command_rx.is_empty() => {
411                trace!(?i, "tick: command_rx");
412                self.handle_command(topic, key, command).await;
413            },
414            Some(new_address) = addr_updates.next() => {
415                trace!(?i, "tick: new_address");
416                self.metrics.actor_tick_endpoint.inc();
417                self.handle_addr_update(new_address).await;
418            }
419            (peer_id, res) = self.dialer.next_conn() => {
420                trace!(?i, "tick: dialer");
421                self.metrics.actor_tick_dialer.inc();
422                match res {
423                    Some(Ok(conn)) => {
424                        debug!(peer = %peer_id.fmt_short(), "dial successful");
425                        self.metrics.actor_tick_dialer_success.inc();
426                        self.handle_connection(peer_id, ConnOrigin::Dial, conn);
427                    }
428                    Some(Err(err)) => {
429                        warn!(peer = %peer_id.fmt_short(), "dial failed: {err}");
430                        self.metrics.actor_tick_dialer_failure.inc();
431                        let peer_state = self.peers.get(&peer_id);
432                        let is_active = matches!(peer_state, Some(PeerState::Active { .. }));
433                        if !is_active {
434                            self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
435                                .await;
436                        }
437                    }
438                    None => {
439                        warn!(peer = %peer_id.fmt_short(), "dial disconnected");
440                        self.metrics.actor_tick_dialer_failure.inc();
441                    }
442                }
443            }
444            event = self.in_event_rx.recv() => {
445                trace!(?i, "tick: in_event_rx");
446                self.metrics.actor_tick_in_event_rx.inc();
447                let event = event.expect("unreachable: in_event_tx is never dropped before receiver");
448                self.handle_in_event(event, Instant::now()).await;
449            }
450            _ = self.timers.wait_next() => {
451                trace!(?i, "tick: timers");
452                self.metrics.actor_tick_timers.inc();
453                let now = Instant::now();
454                while let Some((_instant, timer)) = self.timers.pop_before(now) {
455                    self.handle_in_event(InEvent::TimerExpired(timer), now).await;
456                }
457            }
458            Some(res) = self.connection_tasks.join_next(), if !self.connection_tasks.is_empty() => {
459                trace!(?i, "tick: connection_tasks");
460                let (peer_id, conn, result) = res.expect("connection task panicked");
461                self.handle_connection_task_finished(peer_id, conn, result).await;
462            }
463            Some(res) = self.topic_event_forwarders.join_next(), if !self.topic_event_forwarders.is_empty() => {
464                let topic_id = res.expect("topic event forwarder panicked");
465                if let Some(state) = self.topics.get_mut(&topic_id) {
466                    if !state.still_needed() {
467                        self.quit_queue.push_back(topic_id);
468                        self.process_quit_queue().await;
469                    }
470                }
471            }
472        }
473
474        true
475    }
476
477    async fn handle_addr_update(&mut self, endpoint_addr: EndpointAddr) {
478        // let peer_data = our_peer_data(&self.endpoint, current_addresses);
479        let peer_data = encode_peer_data(&endpoint_addr.into());
480        self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now())
481            .await
482    }
483
484    async fn handle_command(
485        &mut self,
486        topic: TopicId,
487        key: stream_group::Key,
488        command: Option<Command>,
489    ) {
490        debug!(?topic, ?key, ?command, "handle command");
491        let Some(state) = self.topics.get_mut(&topic) else {
492            // TODO: unreachable?
493            warn!("received command for unknown topic");
494            return;
495        };
496        match command {
497            Some(command) => {
498                let command = match command {
499                    Command::Broadcast(message) => ProtoCommand::Broadcast(message, Scope::Swarm),
500                    Command::BroadcastNeighbors(message) => {
501                        ProtoCommand::Broadcast(message, Scope::Neighbors)
502                    }
503                    Command::JoinPeers(peers) => ProtoCommand::Join(peers),
504                };
505                self.handle_in_event(proto::InEvent::Command(topic, command), Instant::now())
506                    .await;
507            }
508            None => {
509                state.command_rx_keys.remove(&key);
510                if !state.still_needed() {
511                    self.quit_queue.push_back(topic);
512                    self.process_quit_queue().await;
513                }
514            }
515        }
516    }
517
518    fn handle_connection(&mut self, peer_id: EndpointId, origin: ConnOrigin, conn: Connection) {
519        let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
520        let conn_id = conn.stable_id();
521
522        let queue = match self.peers.entry(peer_id) {
523            Entry::Occupied(mut entry) => entry.get_mut().accept_conn(send_tx, conn_id),
524            Entry::Vacant(entry) => {
525                entry.insert(PeerState::Active {
526                    active_send_tx: send_tx,
527                    active_conn_id: conn_id,
528                    other_conns: Vec::new(),
529                });
530                Vec::new()
531            }
532        };
533
534        let max_message_size = self.state.max_message_size();
535        let in_event_tx = self.in_event_tx.clone();
536
537        // Spawn a task for this connection
538        self.connection_tasks.spawn(
539            async move {
540                let res = connection_loop(
541                    peer_id,
542                    conn.clone(),
543                    origin,
544                    send_rx,
545                    in_event_tx,
546                    max_message_size,
547                    queue,
548                )
549                .await;
550                (peer_id, conn, res)
551            }
552            .instrument(error_span!("conn", peer = %peer_id.fmt_short())),
553        );
554    }
555
556    #[tracing::instrument(name = "conn", skip_all, fields(peer = %peer_id.fmt_short()))]
557    async fn handle_connection_task_finished(
558        &mut self,
559        peer_id: EndpointId,
560        conn: Connection,
561        task_result: Result<(), ConnectionLoopError>,
562    ) {
563        if conn.close_reason().is_none() {
564            conn.close(0u32.into(), b"close from disconnect");
565        }
566        let reason = conn.close_reason().expect("just closed");
567        let error = task_result.err();
568        debug!(%reason, ?error, "connection closed");
569        if let Some(PeerState::Active {
570            active_conn_id,
571            other_conns,
572            ..
573        }) = self.peers.get_mut(&peer_id)
574        {
575            if conn.stable_id() == *active_conn_id {
576                debug!("active send connection closed, mark peer as disconnected");
577                self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
578                    .await;
579            } else {
580                other_conns.retain(|x| *x != conn.stable_id());
581                debug!("remaining {} other connections", other_conns.len() + 1);
582            }
583        } else {
584            debug!("peer already marked as disconnected");
585        }
586    }
587
588    async fn handle_rpc_msg(&mut self, msg: RpcMessage, now: Instant) {
589        trace!("handle to_actor  {msg:?}");
590        match msg {
591            RpcMessage::Join(msg) => {
592                let WithChannels {
593                    inner,
594                    rx,
595                    tx,
596                    // TODO(frando): make use of span?
597                    span: _,
598                } = msg;
599                let api::JoinRequest {
600                    topic_id,
601                    bootstrap,
602                } = inner;
603                let TopicState {
604                    neighbors,
605                    event_sender,
606                    command_rx_keys,
607                } = self.topics.entry(topic_id).or_default();
608                let mut sender_dead = false;
609                if !neighbors.is_empty() {
610                    for neighbor in neighbors.iter() {
611                        if let Err(_err) = tx.try_send(Event::NeighborUp(*neighbor)).await {
612                            sender_dead = true;
613                            break;
614                        }
615                    }
616                }
617
618                if !sender_dead {
619                    let fut =
620                        topic_subscriber_loop(tx, event_sender.subscribe()).map(move |_| topic_id);
621                    self.topic_event_forwarders
622                        .spawn(fut.instrument(tracing::Span::current()));
623                }
624                let command_rx = TopicCommandStream::new(topic_id, Box::pin(rx.into_stream()));
625                let key = self.command_rx.insert(command_rx);
626                command_rx_keys.insert(key);
627
628                self.handle_in_event(
629                    InEvent::Command(
630                        topic_id,
631                        ProtoCommand::Join(bootstrap.into_iter().collect()),
632                    ),
633                    now,
634                )
635                .await;
636            }
637        }
638    }
639
640    async fn handle_in_event(&mut self, event: InEvent, now: Instant) {
641        self.handle_in_event_inner(event, now).await;
642        self.process_quit_queue().await;
643    }
644
645    async fn process_quit_queue(&mut self) {
646        while let Some(topic_id) = self.quit_queue.pop_front() {
647            self.handle_in_event_inner(
648                InEvent::Command(topic_id, ProtoCommand::Quit),
649                Instant::now(),
650            )
651            .await;
652            if self.topics.remove(&topic_id).is_some() {
653                tracing::debug!(%topic_id, "publishers and subscribers gone; unsubscribing");
654            }
655        }
656    }
657
658    async fn handle_in_event_inner(&mut self, event: InEvent, now: Instant) {
659        if matches!(event, InEvent::TimerExpired(_)) {
660            trace!(?event, "handle in_event");
661        } else {
662            debug!(?event, "handle in_event");
663        };
664        let out = self.state.handle(event, now, Some(&self.metrics));
665        for event in out {
666            if matches!(event, OutEvent::ScheduleTimer(_, _)) {
667                trace!(?event, "handle out_event");
668            } else {
669                debug!(?event, "handle out_event");
670            };
671            match event {
672                OutEvent::SendMessage(peer_id, message) => {
673                    let state = self.peers.entry(peer_id).or_default();
674                    match state {
675                        PeerState::Active { active_send_tx, .. } => {
676                            if let Err(_err) = active_send_tx.send(message).await {
677                                // Removing the peer is handled by the in_event PeerDisconnected sent
678                                // in [`Self::handle_connection_task_finished`].
679                                warn!(
680                                    peer = %peer_id.fmt_short(),
681                                    "failed to send: connection task send loop terminated",
682                                );
683                            }
684                        }
685                        PeerState::Pending { queue } => {
686                            if queue.is_empty() {
687                                debug!(peer = %peer_id.fmt_short(), "start to dial");
688                                self.dialer.queue_dial(peer_id, self.alpn.clone());
689                            }
690                            queue.push(message);
691                        }
692                    }
693                }
694                OutEvent::EmitEvent(topic_id, event) => {
695                    let Some(state) = self.topics.get_mut(&topic_id) else {
696                        // TODO: unreachable?
697                        warn!(?topic_id, "gossip state emitted event for unknown topic");
698                        continue;
699                    };
700                    let TopicState {
701                        neighbors,
702                        event_sender,
703                        ..
704                    } = state;
705                    match &event {
706                        ProtoEvent::NeighborUp(neighbor) => {
707                            neighbors.insert(*neighbor);
708                        }
709                        ProtoEvent::NeighborDown(neighbor) => {
710                            neighbors.remove(neighbor);
711                        }
712                        _ => {}
713                    }
714                    event_sender.send(event).ok();
715                    if !state.still_needed() {
716                        self.quit_queue.push_back(topic_id);
717                    }
718                }
719                OutEvent::ScheduleTimer(delay, timer) => {
720                    self.timers.insert(now + delay, timer);
721                }
722                OutEvent::DisconnectPeer(peer_id) => {
723                    // signal disconnection by dropping the senders to the connection
724                    debug!(peer=%peer_id.fmt_short(), "gossip state indicates disconnect: drop peer");
725                    self.peers.remove(&peer_id);
726                }
727                OutEvent::PeerData(endpoint_id, data) => match decode_peer_data(&data) {
728                    Err(err) => warn!("Failed to decode {data:?} from {endpoint_id}: {err}"),
729                    Ok(info) => {
730                        debug!(peer = ?endpoint_id, "add known addrs: {info:?}");
731                        let mut endpoint_addr = EndpointAddr::new(endpoint_id);
732                        for addr in info.direct_addresses {
733                            endpoint_addr = endpoint_addr.with_ip_addr(addr);
734                        }
735                        if let Some(relay_url) = info.relay_url {
736                            endpoint_addr = endpoint_addr.with_relay_url(relay_url);
737                        }
738
739                        self.address_lookup.add(endpoint_addr);
740                    }
741                },
742            }
743        }
744    }
745}
746
747type ConnId = usize;
748
749#[derive(Debug)]
750enum PeerState {
751    Pending {
752        queue: Vec<ProtoMessage>,
753    },
754    Active {
755        active_send_tx: mpsc::Sender<ProtoMessage>,
756        active_conn_id: ConnId,
757        other_conns: Vec<ConnId>,
758    },
759}
760
761impl PeerState {
762    fn accept_conn(
763        &mut self,
764        send_tx: mpsc::Sender<ProtoMessage>,
765        conn_id: ConnId,
766    ) -> Vec<ProtoMessage> {
767        match self {
768            PeerState::Pending { queue } => {
769                let queue = std::mem::take(queue);
770                *self = PeerState::Active {
771                    active_send_tx: send_tx,
772                    active_conn_id: conn_id,
773                    other_conns: Vec::new(),
774                };
775                queue
776            }
777            PeerState::Active {
778                active_send_tx,
779                active_conn_id,
780                other_conns,
781            } => {
782                // We already have an active connection. We keep the old connection intact,
783                // but only use the new connection for sending from now on.
784                // By dropping the `send_tx` of the old connection, the send loop part of
785                // the `connection_loop` of the old connection will terminate, which will also
786                // notify the peer that the old connection may be dropped.
787                other_conns.push(*active_conn_id);
788                *active_send_tx = send_tx;
789                *active_conn_id = conn_id;
790                Vec::new()
791            }
792        }
793    }
794}
795
796impl Default for PeerState {
797    fn default() -> Self {
798        PeerState::Pending { queue: Vec::new() }
799    }
800}
801
802#[derive(Debug)]
803struct TopicState {
804    neighbors: BTreeSet<EndpointId>,
805    event_sender: broadcast::Sender<ProtoEvent>,
806    /// Keys identifying command receivers in [`Actor::command_rx`].
807    ///
808    /// This represents the receiver side of gossip's publish public API.
809    command_rx_keys: HashSet<stream_group::Key>,
810}
811
812impl Default for TopicState {
813    fn default() -> Self {
814        let (event_sender, _) = broadcast::channel(TOPIC_EVENT_CAP);
815        Self {
816            neighbors: Default::default(),
817            command_rx_keys: Default::default(),
818            event_sender,
819        }
820    }
821}
822
823impl TopicState {
824    /// Check if the topic still has any publisher or subscriber.
825    fn still_needed(&self) -> bool {
826        // Keep topic alive if either senders or receivers exist.
827        // Using || prevents topic closure when senders are dropped while receivers listen.
828        !self.command_rx_keys.is_empty() || self.event_sender.receiver_count() > 0
829    }
830
831    #[cfg(test)]
832    fn joined(&self) -> bool {
833        !self.neighbors.is_empty()
834    }
835}
836
837/// Whether a connection is initiated by us (Dial) or by the remote peer (Accept)
838#[derive(Debug, Clone, Copy, PartialEq, Eq)]
839enum ConnOrigin {
840    Accept,
841    Dial,
842}
843
844#[allow(missing_docs)]
845#[stack_error(derive, add_meta, from_sources, std_sources)]
846#[non_exhaustive]
847enum ConnectionLoopError {
848    #[error(transparent)]
849    Write {
850        source: self::util::WriteError,
851    },
852    #[error(transparent)]
853    Read {
854        source: self::util::ReadError,
855    },
856    #[error(transparent)]
857    Connection {
858        #[error(std_err)]
859        source: iroh::endpoint::ConnectionError,
860    },
861    ActorDropped {},
862}
863
864impl<T> From<mpsc::error::SendError<T>> for ConnectionLoopError {
865    fn from(_value: mpsc::error::SendError<T>) -> Self {
866        e!(ConnectionLoopError::ActorDropped)
867    }
868}
869
870async fn connection_loop(
871    from: PublicKey,
872    conn: Connection,
873    origin: ConnOrigin,
874    send_rx: mpsc::Receiver<ProtoMessage>,
875    in_event_tx: mpsc::Sender<InEvent>,
876    max_message_size: usize,
877    queue: Vec<ProtoMessage>,
878) -> Result<(), ConnectionLoopError> {
879    debug!(?origin, "connection established");
880
881    let mut send_loop = SendLoop::new(conn.clone(), send_rx, max_message_size);
882    let mut recv_loop = RecvLoop::new(from, conn, in_event_tx, max_message_size);
883
884    let send_fut = send_loop.run(queue).instrument(error_span!("send"));
885    let recv_fut = recv_loop.run().instrument(error_span!("recv"));
886
887    let (send_res, recv_res) = tokio::join!(send_fut, recv_fut);
888    send_res?;
889    recv_res?;
890    Ok(())
891}
892
893#[derive(Default, Debug, Clone, Serialize, Deserialize)]
894struct AddrInfo {
895    relay_url: Option<RelayUrl>,
896    direct_addresses: BTreeSet<SocketAddr>,
897}
898
899impl From<EndpointAddr> for AddrInfo {
900    fn from(endpoint_addr: EndpointAddr) -> Self {
901        Self {
902            relay_url: endpoint_addr.relay_urls().next().cloned(),
903            direct_addresses: endpoint_addr.ip_addrs().cloned().collect(),
904        }
905    }
906}
907
908fn encode_peer_data(info: &AddrInfo) -> PeerData {
909    let bytes = postcard::to_stdvec(info).expect("serializing AddrInfo may not fail");
910    PeerData::new(bytes)
911}
912
913fn decode_peer_data(peer_data: &PeerData) -> Result<AddrInfo, postcard::Error> {
914    let bytes = peer_data.as_bytes();
915    if bytes.is_empty() {
916        return Ok(AddrInfo::default());
917    }
918    let info = postcard::from_bytes(bytes)?;
919    Ok(info)
920}
921
922async fn topic_subscriber_loop(
923    sender: irpc::channel::mpsc::Sender<Event>,
924    mut topic_events: broadcast::Receiver<ProtoEvent>,
925) {
926    loop {
927        tokio::select! {
928           biased;
929           msg = topic_events.recv() => {
930               let event = match msg {
931                   Err(broadcast::error::RecvError::Closed) => break,
932                   Err(broadcast::error::RecvError::Lagged(_)) => Event::Lagged,
933                   Ok(event) => event.into(),
934               };
935               if sender.send(event).await.is_err() {
936                   break;
937               }
938           }
939           _ = sender.closed() => break,
940        }
941    }
942}
943
944/// A stream of commands for a gossip subscription.
945type BoxedCommandReceiver =
946    n0_future::stream::Boxed<Result<Command, irpc::channel::mpsc::RecvError>>;
947
948#[derive(derive_more::Debug)]
949struct TopicCommandStream {
950    topic_id: TopicId,
951    #[debug("CommandStream")]
952    stream: BoxedCommandReceiver,
953    closed: bool,
954}
955
956impl TopicCommandStream {
957    fn new(topic_id: TopicId, stream: BoxedCommandReceiver) -> Self {
958        Self {
959            topic_id,
960            stream,
961            closed: false,
962        }
963    }
964}
965
966impl Stream for TopicCommandStream {
967    type Item = (TopicId, Option<Command>);
968    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
969        if self.closed {
970            return Poll::Ready(None);
971        }
972        match Pin::new(&mut self.stream).poll_next(cx) {
973            Poll::Ready(Some(Ok(item))) => Poll::Ready(Some((self.topic_id, Some(item)))),
974            Poll::Ready(None) | Poll::Ready(Some(Err(_))) => {
975                self.closed = true;
976                Poll::Ready(Some((self.topic_id, None)))
977            }
978            Poll::Pending => Poll::Pending,
979        }
980    }
981}
982
983#[derive(Debug)]
984struct Dialer {
985    endpoint: Endpoint,
986    pending: JoinSet<(
987        EndpointId,
988        Option<Result<Connection, iroh::endpoint::ConnectError>>,
989    )>,
990    pending_dials: HashMap<EndpointId, CancellationToken>,
991}
992
993impl Dialer {
994    /// Create a new dialer for a [`Endpoint`]
995    fn new(endpoint: Endpoint) -> Self {
996        Self {
997            endpoint,
998            pending: Default::default(),
999            pending_dials: Default::default(),
1000        }
1001    }
1002
1003    /// Starts to dial a endpoint by [`EndpointId`].
1004    fn queue_dial(&mut self, endpoint_id: EndpointId, alpn: Bytes) {
1005        if self.is_pending(endpoint_id) {
1006            return;
1007        }
1008        let cancel = CancellationToken::new();
1009        self.pending_dials.insert(endpoint_id, cancel.clone());
1010        let endpoint = self.endpoint.clone();
1011        self.pending.spawn(
1012            async move {
1013                let res = tokio::select! {
1014                    biased;
1015                    _ = cancel.cancelled() => None,
1016                    res = endpoint.connect(endpoint_id, &alpn) => Some(res),
1017                };
1018                (endpoint_id, res)
1019            }
1020            .instrument(tracing::Span::current()),
1021        );
1022    }
1023
1024    /// Checks if a endpoint is currently being dialed.
1025    fn is_pending(&self, endpoint: EndpointId) -> bool {
1026        self.pending_dials.contains_key(&endpoint)
1027    }
1028
1029    /// Waits for the next dial operation to complete.
1030    /// `None` means disconnected
1031    async fn next_conn(
1032        &mut self,
1033    ) -> (
1034        EndpointId,
1035        Option<Result<Connection, iroh::endpoint::ConnectError>>,
1036    ) {
1037        match self.pending_dials.is_empty() {
1038            false => {
1039                let (endpoint_id, res) = loop {
1040                    match self.pending.join_next().await {
1041                        Some(Ok((endpoint_id, res))) => {
1042                            self.pending_dials.remove(&endpoint_id);
1043                            break (endpoint_id, res);
1044                        }
1045                        Some(Err(e)) => {
1046                            error!("next conn error: {:?}", e);
1047                        }
1048                        None => {
1049                            error!("no more pending conns available");
1050                            std::future::pending().await
1051                        }
1052                    }
1053                };
1054
1055                (endpoint_id, res)
1056            }
1057            true => std::future::pending().await,
1058        }
1059    }
1060}
1061
1062#[cfg(test)]
1063pub(crate) mod test {
1064    use std::time::Duration;
1065
1066    use bytes::Bytes;
1067    use futures_concurrency::future::TryJoin;
1068    use iroh::{
1069        address_lookup::memory::MemoryLookup, endpoint::BindError, protocol::Router, RelayMap,
1070        RelayMode, SecretKey,
1071    };
1072    use n0_error::{AnyError, Result, StdResultExt};
1073    use n0_tracing_test::traced_test;
1074    use rand::{CryptoRng, Rng};
1075    use tokio::{spawn, time::timeout};
1076    use tokio_util::sync::CancellationToken;
1077    use tracing::{info, instrument};
1078
1079    use super::*;
1080    use crate::api::{ApiError, GossipReceiver, GossipSender};
1081
1082    struct ManualActorLoop {
1083        actor: Actor,
1084        step: usize,
1085    }
1086
1087    impl std::ops::Deref for ManualActorLoop {
1088        type Target = Actor;
1089
1090        fn deref(&self) -> &Self::Target {
1091            &self.actor
1092        }
1093    }
1094
1095    impl std::ops::DerefMut for ManualActorLoop {
1096        fn deref_mut(&mut self) -> &mut Self::Target {
1097            &mut self.actor
1098        }
1099    }
1100
1101    type EndpointHandle = tokio::task::JoinHandle<Result<()>>;
1102
1103    impl ManualActorLoop {
1104        #[instrument(skip_all, fields(me = %actor.endpoint.id().fmt_short()))]
1105        async fn new(mut actor: Actor) -> Self {
1106            let _ = actor.setup().await;
1107            Self { actor, step: 0 }
1108        }
1109
1110        #[instrument(skip_all, fields(me = %self.endpoint.id().fmt_short()))]
1111        async fn step(&mut self) -> bool {
1112            let ManualActorLoop { actor, step } = self;
1113            *step += 1;
1114            // ignore updates that change our published address. This gives us better control over
1115            // events since the endpoint it no longer emitting changes
1116            let addr_update_stream = &mut futures_lite::stream::pending();
1117            actor.event_loop(addr_update_stream, *step).await
1118        }
1119
1120        async fn steps(&mut self, n: usize) {
1121            for _ in 0..n {
1122                self.step().await;
1123            }
1124        }
1125
1126        async fn finish(mut self) {
1127            while self.step().await {}
1128        }
1129    }
1130
1131    impl Gossip {
1132        /// Creates a testing gossip instance and its actor without spawning it.
1133        ///
1134        /// This creates the endpoint and spawns the endpoint loop as well. The handle for the
1135        /// endpoing task is returned along the gossip instance and actor. Since the actor is not
1136        /// actually spawned as [`Builder::spawn`] would, the gossip instance will have a
1137        /// handle to a dummy task instead.
1138        async fn t_new_with_actor(
1139            rng: &mut rand_chacha::ChaCha12Rng,
1140            config: proto::Config,
1141            relay_map: RelayMap,
1142            cancel: &CancellationToken,
1143        ) -> Result<(Self, Actor, EndpointHandle), BindError> {
1144            let endpoint = create_endpoint(rng, relay_map, None).await?;
1145            let metrics = Arc::new(Metrics::default());
1146            let address_lookup = GossipAddressLookup::default();
1147            endpoint.address_lookup().add(address_lookup.clone());
1148
1149            let (actor, to_actor_tx, conn_tx) =
1150                Actor::new(endpoint, config, metrics.clone(), None, address_lookup);
1151            let max_message_size = actor.state.max_message_size();
1152
1153            let _actor_handle =
1154                AbortOnDropHandle::new(task::spawn(futures_lite::future::pending()));
1155            let gossip = Self {
1156                inner: Inner {
1157                    api: GossipApi::local(to_actor_tx),
1158                    local_tx: conn_tx,
1159                    _actor_handle,
1160                    max_message_size,
1161                    metrics,
1162                }
1163                .into(),
1164            };
1165
1166            let endpoint_task = task::spawn(endpoint_loop(
1167                actor.endpoint.clone(),
1168                gossip.clone(),
1169                cancel.child_token(),
1170            ));
1171
1172            Ok((gossip, actor, endpoint_task))
1173        }
1174
1175        /// Crates a new testing gossip instance with the normal actor loop.
1176        async fn t_new(
1177            rng: &mut rand_chacha::ChaCha12Rng,
1178            config: proto::Config,
1179            relay_map: RelayMap,
1180            cancel: &CancellationToken,
1181        ) -> Result<(Self, Endpoint, EndpointHandle, impl Drop + use<>), BindError> {
1182            let (g, actor, ep_handle) =
1183                Gossip::t_new_with_actor(rng, config, relay_map, cancel).await?;
1184            let ep = actor.endpoint.clone();
1185            let me = ep.id().fmt_short();
1186            let actor_handle =
1187                task::spawn(actor.run().instrument(tracing::error_span!("gossip", %me)));
1188            Ok((g, ep, ep_handle, AbortOnDropHandle::new(actor_handle)))
1189        }
1190    }
1191
1192    pub(crate) async fn create_endpoint(
1193        rng: &mut rand_chacha::ChaCha12Rng,
1194        relay_map: RelayMap,
1195        memory_lookup: Option<MemoryLookup>,
1196    ) -> Result<Endpoint, BindError> {
1197        let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1198            .secret_key(SecretKey::generate(rng))
1199            .alpns(vec![GOSSIP_ALPN.to_vec()])
1200            .insecure_skip_relay_cert_verify(true)
1201            .bind()
1202            .await?;
1203
1204        if let Some(memory_lookup) = memory_lookup {
1205            ep.address_lookup().add(memory_lookup);
1206        }
1207        ep.online().await;
1208        Ok(ep)
1209    }
1210
1211    async fn endpoint_loop(
1212        endpoint: Endpoint,
1213        gossip: Gossip,
1214        cancel: CancellationToken,
1215    ) -> Result<()> {
1216        loop {
1217            tokio::select! {
1218                biased;
1219                _ = cancel.cancelled() => break,
1220                incoming = endpoint.accept() => match incoming {
1221                    None => break,
1222                    Some(incoming) => {
1223                        let connecting = match incoming.accept() {
1224                            Ok(connecting) => connecting,
1225                            Err(err) => {
1226                                warn!("incoming connection failed: {err:#}");
1227                                // we can carry on in these cases:
1228                                // this can be caused by retransmitted datagrams
1229                                continue;
1230                            }
1231                        };
1232                        let connection = connecting
1233                            .await
1234                            .std_context("await incoming connection")?;
1235                        gossip.handle_connection(connection).await?
1236                    }
1237                }
1238            }
1239        }
1240        Ok(())
1241    }
1242
1243    #[tokio::test]
1244    #[traced_test]
1245    async fn gossip_net_smoke() {
1246        let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1247        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1248
1249        let memory_lookup = MemoryLookup::new();
1250
1251        let ep1 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1252            .await
1253            .unwrap();
1254        let ep2 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1255            .await
1256            .unwrap();
1257        let ep3 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1258            .await
1259            .unwrap();
1260
1261        let go1 = Gossip::builder().spawn(ep1.clone());
1262        let go2 = Gossip::builder().spawn(ep2.clone());
1263        let go3 = Gossip::builder().spawn(ep3.clone());
1264        debug!("peer1 {:?}", ep1.id());
1265        debug!("peer2 {:?}", ep2.id());
1266        debug!("peer3 {:?}", ep3.id());
1267        let pi1 = ep1.id();
1268        let pi2 = ep2.id();
1269
1270        let cancel = CancellationToken::new();
1271        let tasks = [
1272            spawn(endpoint_loop(ep1.clone(), go1.clone(), cancel.clone())),
1273            spawn(endpoint_loop(ep2.clone(), go2.clone(), cancel.clone())),
1274            spawn(endpoint_loop(ep3.clone(), go3.clone(), cancel.clone())),
1275        ];
1276
1277        debug!("----- adding peers  ----- ");
1278        let topic: TopicId = blake3::hash(b"foobar").into();
1279
1280        let addr1 = EndpointAddr::new(pi1).with_relay_url(relay_url.clone());
1281        let addr2 = EndpointAddr::new(pi2).with_relay_url(relay_url);
1282        memory_lookup.add_endpoint_info(addr1.clone());
1283        memory_lookup.add_endpoint_info(addr2.clone());
1284
1285        debug!("----- joining  ----- ");
1286        // join the topics and wait for the connection to succeed
1287        let [sub1, mut sub2, mut sub3] = [
1288            go1.subscribe_and_join(topic, vec![]),
1289            go2.subscribe_and_join(topic, vec![pi1]),
1290            go3.subscribe_and_join(topic, vec![pi2]),
1291        ]
1292        .try_join()
1293        .await
1294        .unwrap();
1295
1296        let (sink1, _stream1) = sub1.split();
1297
1298        let len = 2;
1299
1300        // publish messages on endpoint1
1301        let pub1 = spawn(async move {
1302            for i in 0..len {
1303                let message = format!("hi{i}");
1304                info!("go1 broadcast: {message:?}");
1305                sink1.broadcast(message.into_bytes().into()).await.unwrap();
1306                tokio::time::sleep(Duration::from_micros(1)).await;
1307            }
1308        });
1309
1310        // wait for messages on endpoint2
1311        let sub2 = spawn(async move {
1312            let mut recv = vec![];
1313            loop {
1314                let ev = sub2.next().await.unwrap().unwrap();
1315                info!("go2 event: {ev:?}");
1316                if let Event::Received(msg) = ev {
1317                    recv.push(msg.content);
1318                }
1319                if recv.len() == len {
1320                    return recv;
1321                }
1322            }
1323        });
1324
1325        // wait for messages on endpoint3
1326        let sub3 = spawn(async move {
1327            let mut recv = vec![];
1328            loop {
1329                let ev = sub3.next().await.unwrap().unwrap();
1330                info!("go3 event: {ev:?}");
1331                if let Event::Received(msg) = ev {
1332                    recv.push(msg.content);
1333                }
1334                if recv.len() == len {
1335                    return recv;
1336                }
1337            }
1338        });
1339
1340        timeout(Duration::from_secs(10), pub1)
1341            .await
1342            .unwrap()
1343            .unwrap();
1344        let recv2 = timeout(Duration::from_secs(10), sub2)
1345            .await
1346            .unwrap()
1347            .unwrap();
1348        let recv3 = timeout(Duration::from_secs(10), sub3)
1349            .await
1350            .unwrap()
1351            .unwrap();
1352
1353        // We assert the received messages, but not their order.
1354        // While commonly they will be received in-order, for go3 it may happen
1355        // that the second message arrives before the first one, because it managed to
1356        // forward-join go1 before the second message is published.
1357        let expected: HashSet<Bytes> = (0..len)
1358            .map(|i| Bytes::from(format!("hi{i}").into_bytes()))
1359            .collect();
1360        assert_eq!(HashSet::from_iter(recv2), expected);
1361        assert_eq!(HashSet::from_iter(recv3), expected);
1362
1363        cancel.cancel();
1364        for t in tasks {
1365            timeout(Duration::from_secs(10), t)
1366                .await
1367                .unwrap()
1368                .unwrap()
1369                .unwrap();
1370        }
1371    }
1372
1373    /// Test that when a gossip topic is no longer needed it's actually unsubscribed.
1374    ///
1375    /// This test will:
1376    /// - Create two endpoints, the first using manual event loop.
1377    /// - Subscribe both endpoints to the same topic. The first endpoint will subscribe twice and connect
1378    ///   to the second endpoint. The second endpoint will subscribe without bootstrap.
1379    /// - Ensure that the first endpoint removes the subscription iff all topic handles have been
1380    ///   dropped
1381    // NOTE: this is a regression test.
1382    #[tokio::test]
1383    #[traced_test]
1384    async fn subscription_cleanup() -> Result {
1385        let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1386        let ct = CancellationToken::new();
1387        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1388
1389        // create the first endpoint with a manual actor loop
1390        let (go1, actor, ep1_handle) =
1391            Gossip::t_new_with_actor(rng, Default::default(), relay_map.clone(), &ct).await?;
1392        let mut actor = ManualActorLoop::new(actor).await;
1393
1394        // create the second endpoint with the usual actor loop
1395        let (go2, ep2, ep2_handle, _test_actor_handle) =
1396            Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1397
1398        let endpoint_id1 = actor.endpoint.id();
1399        let endpoint_id2 = ep2.id();
1400        tracing::info!(
1401            endpoint_1 = %endpoint_id1.fmt_short(),
1402            endpoint_2 = %endpoint_id2.fmt_short(),
1403            "endpoints ready"
1404        );
1405
1406        let topic: TopicId = blake3::hash(b"subscription_cleanup").into();
1407        tracing::info!(%topic, "joining");
1408
1409        // create the tasks for each gossip instance:
1410        // - second endpoint subscribes once without bootstrap and listens to events
1411        // - first endpoint subscribes twice with the second endpoint as bootstrap. This is done on command
1412        //   from the main task (this)
1413
1414        // second endpoint
1415        let ct2 = ct.clone();
1416        let go2_task = async move {
1417            let (_pub_tx, mut sub_rx) = go2.subscribe_and_join(topic, vec![]).await?.split();
1418
1419            let subscribe_fut = async {
1420                while let Some(ev) = sub_rx.try_next().await? {
1421                    match ev {
1422                        Event::Lagged => tracing::debug!("missed some messages :("),
1423                        Event::Received(_) => unreachable!("test does not send messages"),
1424                        other => tracing::debug!(?other, "gs event"),
1425                    }
1426                }
1427
1428                tracing::debug!("subscribe stream ended");
1429                Ok::<_, AnyError>(())
1430            };
1431
1432            tokio::select! {
1433                _ = ct2.cancelled() => Ok(()),
1434                res = subscribe_fut => res,
1435            }
1436        }
1437        .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1438        let go2_handle = task::spawn(go2_task);
1439
1440        // first endpoint
1441        let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1442        let memory_lookup = MemoryLookup::new();
1443        memory_lookup.add_endpoint_info(addr2);
1444        actor.endpoint.address_lookup().add(memory_lookup);
1445        // we use a channel to signal advancing steps to the task
1446        let (tx, mut rx) = mpsc::channel::<()>(1);
1447        let ct1 = ct.clone();
1448        let go1_task = async move {
1449            // first subscribe is done immediately
1450            tracing::info!("subscribing the first time");
1451            let sub_1a = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1452
1453            // wait for signal to subscribe a second time
1454            rx.recv().await.expect("signal for second subscribe");
1455            tracing::info!("subscribing a second time");
1456            let sub_1b = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1457            drop(sub_1a);
1458
1459            // wait for signal to drop the second handle as well
1460            rx.recv().await.expect("signal for second subscribe");
1461            tracing::info!("dropping all handles");
1462            drop(sub_1b);
1463
1464            // wait for cancellation
1465            ct1.cancelled().await;
1466            drop(go1);
1467
1468            Ok::<_, AnyError>(())
1469        }
1470        .instrument(tracing::debug_span!("endpoint_1", %endpoint_id1));
1471        let go1_handle = task::spawn(go1_task);
1472
1473        // advance and check that the topic is now subscribed
1474        actor.steps(3).await; // handle our subscribe;
1475                              // get peer connection;
1476                              // receive the other peer's information for a NeighborUp
1477        let state = actor.topics.get(&topic).expect("get registered topic");
1478        assert!(state.joined());
1479
1480        // signal the second subscribe, we should remain subscribed
1481        tx.send(())
1482            .await
1483            .std_context("signal additional subscribe")?;
1484        actor.steps(3).await; // subscribe; first receiver gone; first sender gone
1485        let state = actor.topics.get(&topic).expect("get registered topic");
1486        assert!(state.joined());
1487
1488        // signal to drop the second handle, the topic should no longer be subscribed
1489        tx.send(()).await.std_context("signal drop handles")?;
1490        actor.steps(2).await; // second receiver gone; second sender gone
1491        assert!(!actor.topics.contains_key(&topic));
1492
1493        // cleanup and ensure everything went as expected
1494        ct.cancel();
1495        let wait = Duration::from_secs(2);
1496        timeout(wait, ep1_handle)
1497            .await
1498            .std_context("wait endpoint1 task")?
1499            .std_context("join endpoint1 task")??;
1500        timeout(wait, ep2_handle)
1501            .await
1502            .std_context("wait endpoint2 task")?
1503            .std_context("join endpoint2 task")??;
1504        timeout(wait, go1_handle)
1505            .await
1506            .std_context("wait gossip1 task")?
1507            .std_context("join gossip1 task")??;
1508        timeout(wait, go2_handle)
1509            .await
1510            .std_context("wait gossip2 task")?
1511            .std_context("join gossip2 task")??;
1512        timeout(wait, actor.finish())
1513            .await
1514            .std_context("wait actor finish")?;
1515
1516        Ok(())
1517    }
1518
1519    /// Test that endpoints can reconnect to each other.
1520    ///
1521    /// This test will create two endpoints subscribed to the same topic. The second endpoint will
1522    /// unsubscribe and then resubscribe and connection between the endpoints should succeed both
1523    /// times.
1524    // NOTE: This is a regression test
1525    #[tokio::test]
1526    #[traced_test]
1527    async fn can_reconnect() -> Result {
1528        let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1529        let ct = CancellationToken::new();
1530        let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1531
1532        let (go1, ep1, ep1_handle, _test_actor_handle1) =
1533            Gossip::t_new(rng, Default::default(), relay_map.clone(), &ct).await?;
1534
1535        let (go2, ep2, ep2_handle, _test_actor_handle2) =
1536            Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1537
1538        let endpoint_id1 = ep1.id();
1539        let endpoint_id2 = ep2.id();
1540        tracing::info!(
1541            endpoint_1 = %endpoint_id1.fmt_short(),
1542            endpoint_2 = %endpoint_id2.fmt_short(),
1543            "endpoints ready"
1544        );
1545
1546        let topic: TopicId = blake3::hash(b"can_reconnect").into();
1547        tracing::info!(%topic, "joining");
1548
1549        let ct2 = ct.child_token();
1550        // channel used to signal the second gossip instance to advance the test
1551        let (tx, mut rx) = mpsc::channel::<()>(1);
1552        let addr1 = EndpointAddr::new(endpoint_id1).with_relay_url(relay_url.clone());
1553        let memory_lookup = MemoryLookup::new();
1554        memory_lookup.add_endpoint_info(addr1);
1555        ep2.address_lookup().add(memory_lookup.clone());
1556        let go2_task = async move {
1557            let mut sub = go2.subscribe(topic, Vec::new()).await?;
1558            sub.joined().await?;
1559
1560            rx.recv().await.expect("signal to unsubscribe");
1561            tracing::info!("unsubscribing");
1562            drop(sub);
1563
1564            rx.recv().await.expect("signal to subscribe again");
1565            tracing::info!("resubscribing");
1566            let mut sub = go2.subscribe(topic, vec![endpoint_id1]).await?;
1567
1568            sub.joined().await?;
1569            tracing::info!("subscription successful!");
1570
1571            ct2.cancelled().await;
1572
1573            Ok::<_, ApiError>(())
1574        }
1575        .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1576        let go2_handle = task::spawn(go2_task);
1577
1578        let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1579        memory_lookup.add_endpoint_info(addr2);
1580        ep1.address_lookup().add(memory_lookup);
1581
1582        let mut sub = go1.subscribe(topic, vec![endpoint_id2]).await?;
1583        // wait for subscribed notification
1584        sub.joined().await?;
1585
1586        // signal endpoint_2 to unsubscribe
1587        tx.send(()).await.std_context("signal unsubscribe")?;
1588
1589        // we should receive a Neighbor down event
1590        let conn_timeout = Duration::from_millis(500);
1591        let ev = timeout(conn_timeout, sub.try_next())
1592            .await
1593            .std_context("wait neighbor down")??;
1594        assert_eq!(ev, Some(Event::NeighborDown(endpoint_id2)));
1595        tracing::info!("endpoint 2 left");
1596
1597        // signal endpoint_2 to subscribe again
1598        tx.send(()).await.std_context("signal resubscribe")?;
1599
1600        let conn_timeout = Duration::from_millis(500);
1601        let ev = timeout(conn_timeout, sub.try_next())
1602            .await
1603            .std_context("wait neighbor up")??;
1604        assert_eq!(ev, Some(Event::NeighborUp(endpoint_id2)));
1605        tracing::info!("endpoint 2 rejoined!");
1606
1607        // cleanup and ensure everything went as expected
1608        ct.cancel();
1609        let wait = Duration::from_secs(2);
1610        timeout(wait, ep1_handle)
1611            .await
1612            .std_context("wait endpoint1 task")?
1613            .std_context("join endpoint1 task")??;
1614        timeout(wait, ep2_handle)
1615            .await
1616            .std_context("wait endpoint2 task")?
1617            .std_context("join endpoint2 task")??;
1618        timeout(wait, go2_handle)
1619            .await
1620            .std_context("wait gossip2 task")?
1621            .std_context("join gossip2 task")??;
1622
1623        Result::Ok(())
1624    }
1625
1626    #[tokio::test]
1627    #[traced_test]
1628    async fn can_die_and_reconnect() -> Result {
1629        /// Runs a future in a separate runtime on a separate thread, cancelling everything
1630        /// abruptly once `cancel` is invoked.
1631        fn run_in_thread<T: Send + 'static>(
1632            cancel: CancellationToken,
1633            fut: impl std::future::Future<Output = T> + Send + 'static,
1634        ) -> std::thread::JoinHandle<Option<T>> {
1635            std::thread::spawn(move || {
1636                let rt = tokio::runtime::Builder::new_current_thread()
1637                    .enable_all()
1638                    .build()
1639                    .unwrap();
1640                rt.block_on(async move { cancel.run_until_cancelled(fut).await })
1641            })
1642        }
1643
1644        /// Spawns a new endpoint and gossip instance.
1645        async fn spawn_gossip(
1646            secret_key: SecretKey,
1647            relay_map: RelayMap,
1648        ) -> Result<(Router, Gossip), BindError> {
1649            let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1650                .secret_key(secret_key)
1651                .insecure_skip_relay_cert_verify(true)
1652                .bind()
1653                .await?;
1654            let gossip = Gossip::builder().spawn(ep.clone());
1655            let router = Router::builder(ep)
1656                .accept(GOSSIP_ALPN, gossip.clone())
1657                .spawn();
1658            Ok((router, gossip))
1659        }
1660
1661        /// Spawns a gossip endpoint, and broadcasts a single message, then sleep until cancelled externally.
1662        async fn broadcast_once(
1663            secret_key: SecretKey,
1664            relay_map: RelayMap,
1665            bootstrap_addr: EndpointAddr,
1666            topic_id: TopicId,
1667            message: String,
1668        ) -> Result {
1669            let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1670            info!(endpoint_id = %router.endpoint().id().fmt_short(), "broadcast endpoint spawned");
1671            let bootstrap = vec![bootstrap_addr.id];
1672            let memory_lookup = MemoryLookup::new();
1673            memory_lookup.add_endpoint_info(bootstrap_addr);
1674            router.endpoint().address_lookup().add(memory_lookup);
1675            let mut topic = gossip.subscribe_and_join(topic_id, bootstrap).await?;
1676            topic.broadcast(message.as_bytes().to_vec().into()).await?;
1677            std::future::pending::<()>().await;
1678            Ok(())
1679        }
1680
1681        let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1682        let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1683        let topic_id = TopicId::from_bytes(rng.random());
1684
1685        // spawn a gossip endpoint, send the endpoint's address on addr_tx,
1686        // then wait to receive `count` messages, and terminate.
1687        let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
1688        let (msgs_recv_tx, mut msgs_recv_rx) = tokio::sync::mpsc::channel(3);
1689        let recv_task = tokio::task::spawn({
1690            let relay_map = relay_map.clone();
1691            let secret_key = SecretKey::generate(&mut rng);
1692            async move {
1693                let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1694                // wait for the relay to be set. iroh currently has issues when trying
1695                // to immediately reconnect with changed direct addresses, but when the
1696                // relay path is available it works.
1697                // See https://github.com/n0-computer/iroh/pull/3372
1698                router.endpoint().online().await;
1699                let addr = router.endpoint().addr();
1700                info!(endpoint_id = %addr.id.fmt_short(), "recv endpoint spawned");
1701                addr_tx.send(addr).unwrap();
1702                let mut topic = gossip.subscribe_and_join(topic_id, vec![]).await?;
1703                while let Some(event) = topic.try_next().await.unwrap() {
1704                    if let Event::Received(message) = event {
1705                        let message = std::str::from_utf8(&message.content)
1706                            .std_context("decode broadcast message")?
1707                            .to_string();
1708                        msgs_recv_tx
1709                            .send(message)
1710                            .await
1711                            .std_context("forward received message")?;
1712                    }
1713                }
1714                Ok::<_, AnyError>(())
1715            }
1716        });
1717
1718        let endpoint0_addr = addr_rx.await.std_context("receive endpoint address")?;
1719        let max_wait = Duration::from_secs(5);
1720
1721        // spawn a endpoint, send a message, and then abruptly terminate the endpoint ungracefully
1722        // after the message was received on our receiver endpoint.
1723        let cancel = CancellationToken::new();
1724        let secret = SecretKey::generate(&mut rng);
1725        let join_handle_1 = run_in_thread(
1726            cancel.clone(),
1727            broadcast_once(
1728                secret.clone(),
1729                relay_map.clone(),
1730                endpoint0_addr.clone(),
1731                topic_id,
1732                "msg1".to_string(),
1733            ),
1734        );
1735        // assert that we received the message on the receiver endpoint.
1736        let msg = timeout(max_wait, msgs_recv_rx.recv())
1737            .await
1738            .std_context("wait for first broadcast")?
1739            .std_context("receiver dropped channel")?;
1740        assert_eq!(&msg, "msg1");
1741        info!("kill broadcast endpoint");
1742        cancel.cancel();
1743
1744        // spawns the endpoint again with the same endpoint id, and send another message
1745        let cancel = CancellationToken::new();
1746        let join_handle_2 = run_in_thread(
1747            cancel.clone(),
1748            broadcast_once(
1749                secret.clone(),
1750                relay_map.clone(),
1751                endpoint0_addr.clone(),
1752                topic_id,
1753                "msg2".to_string(),
1754            ),
1755        );
1756        // assert that we received the message on the receiver endpoint.
1757        // this means that the reconnect with the same endpoint id worked.
1758        let msg = timeout(max_wait, msgs_recv_rx.recv())
1759            .await
1760            .std_context("wait for second broadcast")?
1761            .std_context("receiver dropped channel")?;
1762        assert_eq!(&msg, "msg2");
1763        info!("kill broadcast endpoint");
1764        cancel.cancel();
1765
1766        info!("kill recv endpoint");
1767        recv_task.abort();
1768        assert!(join_handle_1.join().unwrap().is_none());
1769        assert!(join_handle_2.join().unwrap().is_none());
1770
1771        Ok(())
1772    }
1773
1774    #[tokio::test]
1775    #[traced_test]
1776    async fn gossip_change_alpn() -> n0_error::Result<()> {
1777        let alpn = b"my-gossip-alpn";
1778        let topic_id = TopicId::from([0u8; 32]);
1779
1780        let ep1 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1781        let ep2 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1782        let gossip1 = Gossip::builder().alpn(alpn).spawn(ep1.clone());
1783        let gossip2 = Gossip::builder().alpn(alpn).spawn(ep2.clone());
1784        let router1 = Router::builder(ep1).accept(alpn, gossip1.clone()).spawn();
1785        let router2 = Router::builder(ep2).accept(alpn, gossip2.clone()).spawn();
1786
1787        let addr1 = router1.endpoint().addr();
1788        let id1 = addr1.id;
1789        let memory_lookup = MemoryLookup::new();
1790        memory_lookup.add_endpoint_info(addr1);
1791        router2.endpoint().address_lookup().add(memory_lookup);
1792
1793        let mut topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1794        let mut topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1795
1796        timeout(Duration::from_secs(3), topic1.joined())
1797            .await
1798            .std_context("wait topic1 join")??;
1799        timeout(Duration::from_secs(3), topic2.joined())
1800            .await
1801            .std_context("wait topic2 join")??;
1802        router1.shutdown().await.std_context("shutdown router1")?;
1803        router2.shutdown().await.std_context("shutdown router2")?;
1804        Ok(())
1805    }
1806
1807    #[tokio::test]
1808    #[traced_test]
1809    async fn gossip_rely_on_gossip_address_lookup() -> n0_error::Result<()> {
1810        let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1811
1812        async fn spawn(
1813            rng: &mut impl CryptoRng,
1814        ) -> n0_error::Result<(EndpointId, Router, Gossip, GossipSender, GossipReceiver)> {
1815            let topic_id = TopicId::from([0u8; 32]);
1816            let ep = Endpoint::empty_builder(RelayMode::Disabled)
1817                .secret_key(SecretKey::generate(rng))
1818                .bind()
1819                .await?;
1820            let endpoint_id = ep.id();
1821            let gossip = Gossip::builder().spawn(ep.clone());
1822            let router = Router::builder(ep)
1823                .accept(GOSSIP_ALPN, gossip.clone())
1824                .spawn();
1825            let topic = gossip.subscribe(topic_id, vec![]).await?;
1826            let (sender, receiver) = topic.split();
1827            Ok((endpoint_id, router, gossip, sender, receiver))
1828        }
1829
1830        // spawn 3 endpoints without relay or address lookup
1831        let (n1, r1, _g1, _tx1, mut rx1) = spawn(rng).await?;
1832        let (n2, r2, _g2, tx2, mut rx2) = spawn(rng).await?;
1833        let (n3, r3, _g3, tx3, mut rx3) = spawn(rng).await?;
1834
1835        println!("endpoints {:?}", [n1, n2, n3]);
1836
1837        // create a mem lookup that has only endpoint 1 addr info set
1838        let addr1 = r1.endpoint().addr();
1839        let lookup = MemoryLookup::new();
1840        lookup.add_endpoint_info(addr1);
1841
1842        // add addr info of endpoint1 to endpoint2 and join endpoint1
1843        r2.endpoint().address_lookup().add(lookup.clone());
1844        tx2.join_peers(vec![n1]).await?;
1845
1846        // await join endpoint2 -> nodde1
1847        timeout(Duration::from_secs(3), rx1.joined())
1848            .await
1849            .std_context("wait rx1 join")??;
1850        timeout(Duration::from_secs(3), rx2.joined())
1851            .await
1852            .std_context("wait rx2 join")??;
1853
1854        // add addr info of endpoint1 to endpoint3 and join endpoint1
1855        r3.endpoint().address_lookup().add(lookup.clone());
1856        tx3.join_peers(vec![n1]).await?;
1857
1858        // await join at endpoint3: n1 and n2
1859        // n2 only works because because we use gossip address lookup!
1860        let ev = timeout(Duration::from_secs(3), rx3.next())
1861            .await
1862            .std_context("wait rx3 first neighbor")?;
1863        assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1864        let ev = timeout(Duration::from_secs(3), rx3.next())
1865            .await
1866            .std_context("wait rx3 second neighbor")?;
1867        assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1868
1869        assert_eq!(sorted(rx3.neighbors()), sorted([n1, n2]));
1870
1871        let ev = timeout(Duration::from_secs(3), rx2.next())
1872            .await
1873            .std_context("wait rx2 neighbor")?;
1874        assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1875
1876        let ev = timeout(Duration::from_secs(3), rx1.next())
1877            .await
1878            .std_context("wait rx1 neighbor")?;
1879        assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1880
1881        tokio::try_join!(r1.shutdown(), r2.shutdown(), r3.shutdown())
1882            .std_context("shutdown routers")?;
1883        Ok(())
1884    }
1885
1886    fn sorted<T: Ord>(input: impl IntoIterator<Item = T>) -> Vec<T> {
1887        let mut out: Vec<_> = input.into_iter().collect();
1888        out.sort();
1889        out
1890    }
1891
1892    /// Test that dropping sender doesn't close topic while receiver is still listening.
1893    ///
1894    /// This is a common footgun: users split a GossipTopic, drop the sender early,
1895    /// and expect the receiver to keep working. With the bug (using && in still_needed),
1896    /// the topic closes immediately when sender is dropped.
1897    #[tokio::test]
1898    #[traced_test]
1899    async fn topic_stays_alive_after_sender_drop() -> n0_error::Result<()> {
1900        let topic_id = TopicId::from([99u8; 32]);
1901
1902        let ep1 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1903        let ep2 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1904        let gossip1 = Gossip::builder().spawn(ep1.clone());
1905        let gossip2 = Gossip::builder().spawn(ep2.clone());
1906        let router1 = Router::builder(ep1)
1907            .accept(crate::ALPN, gossip1.clone())
1908            .spawn();
1909        let router2 = Router::builder(ep2)
1910            .accept(crate::ALPN, gossip2.clone())
1911            .spawn();
1912
1913        let addr1 = router1.endpoint().addr();
1914        let id1 = addr1.id;
1915        let mem_lookup = MemoryLookup::new();
1916        mem_lookup.add_endpoint_info(addr1);
1917        router2.endpoint().address_lookup().add(mem_lookup);
1918
1919        let topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1920        let topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1921
1922        let (tx1, mut rx1) = topic1.split();
1923        let (tx2, mut rx2) = topic2.split();
1924
1925        // Wait for mesh to form
1926        timeout(Duration::from_secs(3), rx1.joined())
1927            .await
1928            .std_context("wait rx1 join")??;
1929        timeout(Duration::from_secs(3), rx2.joined())
1930            .await
1931            .std_context("wait rx2 join")??;
1932
1933        // Node 1 drops its sender - simulating the footgun where user drops sender early
1934        drop(tx1);
1935
1936        // Node 2 sends a message - receiver on node 1 should still get it
1937        tx2.broadcast(b"hello from node2".to_vec().into()).await?;
1938
1939        // Node 1's receiver should still work and receive the message
1940        let event = timeout(Duration::from_secs(3), rx1.next())
1941            .await
1942            .std_context("wait for message on rx1")?;
1943
1944        match event {
1945            Some(Ok(Event::Received(msg))) => {
1946                assert_eq!(&msg.content[..], b"hello from node2");
1947            }
1948            other => panic!("expected Received event, got {:?}", other),
1949        }
1950
1951        drop(tx2);
1952        drop(rx1);
1953        drop(rx2);
1954        router1.shutdown().await.std_context("shutdown router1")?;
1955        router2.shutdown().await.std_context("shutdown router2")?;
1956        Ok(())
1957    }
1958}