1use std::{
4 collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
5 net::SocketAddr,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_concurrency::stream::{stream_group, StreamGroup};
13use futures_util::FutureExt as _;
14use iroh::{
15 endpoint::Connection,
16 protocol::{AcceptError, ProtocolHandler},
17 Endpoint, NodeAddr, NodeId, PublicKey, RelayUrl, Watcher,
18};
19use irpc::WithChannels;
20use n0_future::{
21 task::{self, AbortOnDropHandle, JoinSet},
22 time::Instant,
23 Stream, StreamExt as _,
24};
25use nested_enum_utils::common_fields;
26use rand::{rngs::StdRng, SeedableRng};
27use serde::{Deserialize, Serialize};
28use snafu::Snafu;
29use tokio::sync::{broadcast, mpsc, oneshot};
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, error_span, trace, warn, Instrument};
32
33use self::{
34 discovery::GossipDiscovery,
35 util::{RecvLoop, SendLoop, Timers},
36};
37use crate::{
38 api::{self, Command, Event, GossipApi, RpcMessage},
39 metrics::Metrics,
40 proto::{self, HyparviewConfig, PeerData, PlumtreeConfig, Scope, TopicId},
41};
42
43mod discovery;
44mod util;
45
46pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/1";
48
49const SEND_QUEUE_CAP: usize = 64;
51const TO_ACTOR_CAP: usize = 64;
53const IN_EVENT_CAP: usize = 1024;
55const TOPIC_EVENT_CAP: usize = 256;
57
58pub type ProtoEvent = proto::Event<PublicKey>;
60pub type ProtoCommand = proto::Command<PublicKey>;
62
63type InEvent = proto::InEvent<PublicKey>;
64type OutEvent = proto::OutEvent<PublicKey>;
65type Timer = proto::Timer<PublicKey>;
66type ProtoMessage = proto::Message<PublicKey>;
67
68#[derive(Debug, Clone)]
87pub struct Gossip {
88 pub(crate) inner: Arc<Inner>,
89}
90
91impl std::ops::Deref for Gossip {
92 type Target = GossipApi;
93 fn deref(&self) -> &Self::Target {
94 &self.inner.api
95 }
96}
97
98#[derive(Debug)]
99enum LocalActorMessage {
100 HandleConnection(Connection),
101 Shutdown { reply: oneshot::Sender<()> },
102}
103
104#[allow(missing_docs)]
105#[common_fields({
106 backtrace: Option<snafu::Backtrace>,
107 #[snafu(implicit)]
108 span_trace: n0_snafu::SpanTrace,
109})]
110#[derive(Debug, Snafu)]
111#[non_exhaustive]
112pub enum Error {
113 ActorDropped {},
114}
115
116impl<T> From<mpsc::error::SendError<T>> for Error {
117 fn from(_value: mpsc::error::SendError<T>) -> Self {
118 ActorDroppedSnafu.build()
119 }
120}
121impl From<oneshot::error::RecvError> for Error {
122 fn from(_value: oneshot::error::RecvError) -> Self {
123 ActorDroppedSnafu.build()
124 }
125}
126
127#[derive(Debug)]
128pub(crate) struct Inner {
129 api: GossipApi,
130 local_tx: mpsc::Sender<LocalActorMessage>,
131 _actor_handle: AbortOnDropHandle<()>,
132 max_message_size: usize,
133 metrics: Arc<Metrics>,
134}
135
136impl ProtocolHandler for Gossip {
137 async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
138 self.handle_connection(connection)
139 .await
140 .map_err(AcceptError::from_err)?;
141 Ok(())
142 }
143
144 async fn shutdown(&self) {
145 if let Err(err) = self.shutdown().await {
146 warn!("error while shutting down gossip: {err:#}");
147 }
148 }
149}
150
151#[derive(Debug, Clone)]
153pub struct Builder {
154 config: proto::Config,
155 alpn: Option<Bytes>,
156}
157
158impl Builder {
159 pub fn max_message_size(mut self, size: usize) -> Self {
162 self.config.max_message_size = size;
163 self
164 }
165
166 pub fn membership_config(mut self, config: HyparviewConfig) -> Self {
168 self.config.membership = config;
169 self
170 }
171
172 pub fn broadcast_config(mut self, config: PlumtreeConfig) -> Self {
174 self.config.broadcast = config;
175 self
176 }
177
178 pub fn alpn(mut self, alpn: impl AsRef<[u8]>) -> Self {
186 self.alpn = Some(alpn.as_ref().to_vec().into());
187 self
188 }
189
190 pub fn spawn(self, endpoint: Endpoint) -> Gossip {
192 let metrics = Arc::new(Metrics::default());
193 let discovery = GossipDiscovery::default();
194 endpoint.discovery().add(discovery.clone());
195 let (actor, rpc_tx, local_tx) =
196 Actor::new(endpoint, self.config, metrics.clone(), self.alpn, discovery);
197 let me = actor.endpoint.node_id().fmt_short();
198 let max_message_size = actor.state.max_message_size();
199
200 let actor_handle = task::spawn(actor.run().instrument(error_span!("gossip", %me)));
201
202 let api = GossipApi::local(rpc_tx);
203
204 Gossip {
205 inner: Inner {
206 api,
207 local_tx,
208 _actor_handle: AbortOnDropHandle::new(actor_handle),
209 max_message_size,
210 metrics,
211 }
212 .into(),
213 }
214 }
215}
216
217impl Gossip {
218 pub fn builder() -> Builder {
220 Builder {
221 config: Default::default(),
222 alpn: None,
223 }
224 }
225
226 #[cfg(feature = "rpc")]
228 pub async fn listen(self, endpoint: quinn::Endpoint) {
229 self.inner.api.listen(endpoint).await
230 }
231
232 pub fn max_message_size(&self) -> usize {
234 self.inner.max_message_size
235 }
236
237 pub async fn handle_connection(&self, conn: Connection) -> Result<(), Error> {
241 self.inner
242 .local_tx
243 .send(LocalActorMessage::HandleConnection(conn))
244 .await?;
245 Ok(())
246 }
247
248 pub async fn shutdown(&self) -> Result<(), Error> {
253 let (reply, reply_rx) = oneshot::channel();
254 self.inner
255 .local_tx
256 .send(LocalActorMessage::Shutdown { reply })
257 .await?;
258 reply_rx.await?;
259 Ok(())
260 }
261
262 pub fn metrics(&self) -> &Arc<Metrics> {
264 &self.inner.metrics
265 }
266}
267
268struct Actor {
270 alpn: Bytes,
271 state: proto::State<PublicKey, StdRng>,
273 endpoint: Endpoint,
275 dialer: Dialer,
277 rpc_rx: mpsc::Receiver<RpcMessage>,
279 local_rx: mpsc::Receiver<LocalActorMessage>,
280 in_event_tx: mpsc::Sender<InEvent>,
282 in_event_rx: mpsc::Receiver<InEvent>,
284 timers: Timers<Timer>,
286 topics: HashMap<TopicId, TopicState>,
288 peers: HashMap<NodeId, PeerState>,
290 command_rx: stream_group::Keyed<TopicCommandStream>,
292 quit_queue: VecDeque<TopicId>,
294 connection_tasks: JoinSet<(NodeId, Connection, Result<(), ConnectionLoopError>)>,
296 metrics: Arc<Metrics>,
297 topic_event_forwarders: JoinSet<TopicId>,
298 discovery: GossipDiscovery,
299}
300
301impl Actor {
302 fn new(
303 endpoint: Endpoint,
304 config: proto::Config,
305 metrics: Arc<Metrics>,
306 alpn: Option<Bytes>,
307 discovery: GossipDiscovery,
308 ) -> (
309 Self,
310 mpsc::Sender<RpcMessage>,
311 mpsc::Sender<LocalActorMessage>,
312 ) {
313 let peer_id = endpoint.node_id();
314 let dialer = Dialer::new(endpoint.clone());
315 let state = proto::State::new(
316 peer_id,
317 Default::default(),
318 config,
319 rand::rngs::StdRng::from_rng(&mut rand::rng()),
320 );
321 let (rpc_tx, rpc_rx) = mpsc::channel(TO_ACTOR_CAP);
322 let (local_tx, local_rx) = mpsc::channel(16);
323 let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
324
325 let actor = Actor {
326 alpn: alpn.unwrap_or_else(|| GOSSIP_ALPN.to_vec().into()),
327 endpoint,
328 state,
329 dialer,
330 rpc_rx,
331 in_event_rx,
332 in_event_tx,
333 timers: Timers::new(),
334 command_rx: StreamGroup::new().keyed(),
335 peers: Default::default(),
336 topics: Default::default(),
337 quit_queue: Default::default(),
338 connection_tasks: Default::default(),
339 metrics,
340 local_rx,
341 topic_event_forwarders: Default::default(),
342 discovery,
343 };
344
345 (actor, rpc_tx, local_tx)
346 }
347
348 pub async fn run(mut self) {
349 let mut addr_update_stream = self.setup().await;
350
351 let mut i = 0;
352 while self.event_loop(&mut addr_update_stream, i).await {
353 i += 1;
354 }
355 }
356
357 async fn setup(&mut self) -> impl Stream<Item = NodeAddr> + Send + Unpin + use<> {
362 let addr_update_stream = self.endpoint.watch_node_addr().stream();
363 let initial_addr = self.endpoint.node_addr();
364 self.handle_addr_update(initial_addr).await;
365 addr_update_stream
366 }
367
368 async fn event_loop(
372 &mut self,
373 addr_updates: &mut (impl Stream<Item = NodeAddr> + Send + Unpin),
374 i: usize,
375 ) -> bool {
376 self.metrics.actor_tick_main.inc();
377 tokio::select! {
378 biased;
379 conn = self.local_rx.recv() => {
380 match conn {
381 Some(LocalActorMessage::Shutdown { reply }) => {
382 debug!("received shutdown message, quit all topics");
383 self.quit_queue.extend(self.topics.keys().copied());
384 self.process_quit_queue().await;
385 debug!("all topics quit, stop gossip actor");
386 reply.send(()).ok();
387 return false;
388 },
389 Some(LocalActorMessage::HandleConnection(conn)) => {
390 if let Ok(remote_node_id) = conn.remote_node_id() {
391 self.handle_connection(remote_node_id, ConnOrigin::Accept, conn);
392 }
393 }
394 None => {
395 debug!("all gossip handles dropped, stop gossip actor");
396 return false;
397 }
398 }
399 }
400 msg = self.rpc_rx.recv() => {
401 trace!(?i, "tick: to_actor_rx");
402 self.metrics.actor_tick_rx.inc();
403 match msg {
404 Some(msg) => {
405 self.handle_rpc_msg(msg, Instant::now()).await;
406 }
407 None => {
408 debug!("all gossip handles dropped, stop gossip actor");
409 return false;
410 }
411 }
412 },
413 Some((key, (topic, command))) = self.command_rx.next(), if !self.command_rx.is_empty() => {
414 trace!(?i, "tick: command_rx");
415 self.handle_command(topic, key, command).await;
416 },
417 Some(new_address) = addr_updates.next() => {
418 trace!(?i, "tick: new_address");
419 self.metrics.actor_tick_endpoint.inc();
420 self.handle_addr_update(new_address).await;
421 }
422 (peer_id, res) = self.dialer.next_conn() => {
423 trace!(?i, "tick: dialer");
424 self.metrics.actor_tick_dialer.inc();
425 match res {
426 Some(Ok(conn)) => {
427 debug!(peer = %peer_id.fmt_short(), "dial successful");
428 self.metrics.actor_tick_dialer_success.inc();
429 self.handle_connection(peer_id, ConnOrigin::Dial, conn);
430 }
431 Some(Err(err)) => {
432 warn!(peer = %peer_id.fmt_short(), "dial failed: {err}");
433 self.metrics.actor_tick_dialer_failure.inc();
434 let peer_state = self.peers.get(&peer_id);
435 let is_active = matches!(peer_state, Some(PeerState::Active { .. }));
436 if !is_active {
437 self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
438 .await;
439 }
440 }
441 None => {
442 warn!(peer = %peer_id.fmt_short(), "dial disconnected");
443 self.metrics.actor_tick_dialer_failure.inc();
444 }
445 }
446 }
447 event = self.in_event_rx.recv() => {
448 trace!(?i, "tick: in_event_rx");
449 self.metrics.actor_tick_in_event_rx.inc();
450 let event = event.expect("unreachable: in_event_tx is never dropped before receiver");
451 self.handle_in_event(event, Instant::now()).await;
452 }
453 _ = self.timers.wait_next() => {
454 trace!(?i, "tick: timers");
455 self.metrics.actor_tick_timers.inc();
456 let now = Instant::now();
457 while let Some((_instant, timer)) = self.timers.pop_before(now) {
458 self.handle_in_event(InEvent::TimerExpired(timer), now).await;
459 }
460 }
461 Some(res) = self.connection_tasks.join_next(), if !self.connection_tasks.is_empty() => {
462 trace!(?i, "tick: connection_tasks");
463 let (peer_id, conn, result) = res.expect("connection task panicked");
464 self.handle_connection_task_finished(peer_id, conn, result).await;
465 }
466 Some(res) = self.topic_event_forwarders.join_next(), if !self.topic_event_forwarders.is_empty() => {
467 let topic_id = res.expect("topic event forwarder panicked");
468 if let Some(state) = self.topics.get_mut(&topic_id) {
469 if !state.still_needed() {
470 self.quit_queue.push_back(topic_id);
471 self.process_quit_queue().await;
472 }
473 }
474 }
475 }
476
477 true
478 }
479
480 async fn handle_addr_update(&mut self, node_addr: NodeAddr) {
481 let peer_data = encode_peer_data(&node_addr.into());
483 self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now())
484 .await
485 }
486
487 async fn handle_command(
488 &mut self,
489 topic: TopicId,
490 key: stream_group::Key,
491 command: Option<Command>,
492 ) {
493 debug!(?topic, ?key, ?command, "handle command");
494 let Some(state) = self.topics.get_mut(&topic) else {
495 warn!("received command for unknown topic");
497 return;
498 };
499 match command {
500 Some(command) => {
501 let command = match command {
502 Command::Broadcast(message) => ProtoCommand::Broadcast(message, Scope::Swarm),
503 Command::BroadcastNeighbors(message) => {
504 ProtoCommand::Broadcast(message, Scope::Neighbors)
505 }
506 Command::JoinPeers(peers) => ProtoCommand::Join(peers),
507 };
508 self.handle_in_event(proto::InEvent::Command(topic, command), Instant::now())
509 .await;
510 }
511 None => {
512 state.command_rx_keys.remove(&key);
513 if !state.still_needed() {
514 self.quit_queue.push_back(topic);
515 self.process_quit_queue().await;
516 }
517 }
518 }
519 }
520
521 fn handle_connection(&mut self, peer_id: NodeId, origin: ConnOrigin, conn: Connection) {
522 let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
523 let conn_id = conn.stable_id();
524
525 let queue = match self.peers.entry(peer_id) {
526 Entry::Occupied(mut entry) => entry.get_mut().accept_conn(send_tx, conn_id),
527 Entry::Vacant(entry) => {
528 entry.insert(PeerState::Active {
529 active_send_tx: send_tx,
530 active_conn_id: conn_id,
531 other_conns: Vec::new(),
532 });
533 Vec::new()
534 }
535 };
536
537 let max_message_size = self.state.max_message_size();
538 let in_event_tx = self.in_event_tx.clone();
539
540 self.connection_tasks.spawn(
542 async move {
543 let res = connection_loop(
544 peer_id,
545 conn.clone(),
546 origin,
547 send_rx,
548 in_event_tx,
549 max_message_size,
550 queue,
551 )
552 .await;
553 (peer_id, conn, res)
554 }
555 .instrument(error_span!("conn", peer = %peer_id.fmt_short())),
556 );
557 }
558
559 #[tracing::instrument(name = "conn", skip_all, fields(peer = %peer_id.fmt_short()))]
560 async fn handle_connection_task_finished(
561 &mut self,
562 peer_id: NodeId,
563 conn: Connection,
564 task_result: Result<(), ConnectionLoopError>,
565 ) {
566 if conn.close_reason().is_none() {
567 conn.close(0u32.into(), b"close from disconnect");
568 }
569 let reason = conn.close_reason().expect("just closed");
570 let error = task_result.err();
571 debug!(%reason, ?error, "connection closed");
572 if let Some(PeerState::Active {
573 active_conn_id,
574 other_conns,
575 ..
576 }) = self.peers.get_mut(&peer_id)
577 {
578 if conn.stable_id() == *active_conn_id {
579 debug!("active send connection closed, mark peer as disconnected");
580 self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
581 .await;
582 } else {
583 other_conns.retain(|x| *x != conn.stable_id());
584 debug!("remaining {} other connections", other_conns.len() + 1);
585 }
586 } else {
587 debug!("peer already marked as disconnected");
588 }
589 }
590
591 async fn handle_rpc_msg(&mut self, msg: RpcMessage, now: Instant) {
592 trace!("handle to_actor {msg:?}");
593 match msg {
594 RpcMessage::Join(msg) => {
595 let WithChannels {
596 inner,
597 rx,
598 tx,
599 span: _,
601 } = msg;
602 let api::JoinRequest {
603 topic_id,
604 bootstrap,
605 } = inner;
606 let TopicState {
607 neighbors,
608 event_sender,
609 command_rx_keys,
610 } = self.topics.entry(topic_id).or_default();
611 let mut sender_dead = false;
612 if !neighbors.is_empty() {
613 for neighbor in neighbors.iter() {
614 if let Err(_err) = tx.try_send(Event::NeighborUp(*neighbor)).await {
615 sender_dead = true;
616 break;
617 }
618 }
619 }
620
621 if !sender_dead {
622 let fut =
623 topic_subscriber_loop(tx, event_sender.subscribe()).map(move |_| topic_id);
624 self.topic_event_forwarders
625 .spawn(fut.instrument(tracing::Span::current()));
626 }
627 let command_rx = TopicCommandStream::new(topic_id, Box::pin(rx.into_stream()));
628 let key = self.command_rx.insert(command_rx);
629 command_rx_keys.insert(key);
630
631 self.handle_in_event(
632 InEvent::Command(
633 topic_id,
634 ProtoCommand::Join(bootstrap.into_iter().collect()),
635 ),
636 now,
637 )
638 .await;
639 }
640 }
641 }
642
643 async fn handle_in_event(&mut self, event: InEvent, now: Instant) {
644 self.handle_in_event_inner(event, now).await;
645 self.process_quit_queue().await;
646 }
647
648 async fn process_quit_queue(&mut self) {
649 while let Some(topic_id) = self.quit_queue.pop_front() {
650 self.handle_in_event_inner(
651 InEvent::Command(topic_id, ProtoCommand::Quit),
652 Instant::now(),
653 )
654 .await;
655 if self.topics.remove(&topic_id).is_some() {
656 tracing::debug!(%topic_id, "publishers and subscribers gone; unsubscribing");
657 }
658 }
659 }
660
661 async fn handle_in_event_inner(&mut self, event: InEvent, now: Instant) {
662 if matches!(event, InEvent::TimerExpired(_)) {
663 trace!(?event, "handle in_event");
664 } else {
665 debug!(?event, "handle in_event");
666 };
667 let out = self.state.handle(event, now, Some(&self.metrics));
668 for event in out {
669 if matches!(event, OutEvent::ScheduleTimer(_, _)) {
670 trace!(?event, "handle out_event");
671 } else {
672 debug!(?event, "handle out_event");
673 };
674 match event {
675 OutEvent::SendMessage(peer_id, message) => {
676 let state = self.peers.entry(peer_id).or_default();
677 match state {
678 PeerState::Active { active_send_tx, .. } => {
679 if let Err(_err) = active_send_tx.send(message).await {
680 warn!(
683 peer = %peer_id.fmt_short(),
684 "failed to send: connection task send loop terminated",
685 );
686 }
687 }
688 PeerState::Pending { queue } => {
689 if queue.is_empty() {
690 debug!(peer = %peer_id.fmt_short(), "start to dial");
691 self.dialer.queue_dial(peer_id, self.alpn.clone());
692 }
693 queue.push(message);
694 }
695 }
696 }
697 OutEvent::EmitEvent(topic_id, event) => {
698 let Some(state) = self.topics.get_mut(&topic_id) else {
699 warn!(?topic_id, "gossip state emitted event for unknown topic");
701 continue;
702 };
703 let TopicState {
704 neighbors,
705 event_sender,
706 ..
707 } = state;
708 match &event {
709 ProtoEvent::NeighborUp(neighbor) => {
710 neighbors.insert(*neighbor);
711 }
712 ProtoEvent::NeighborDown(neighbor) => {
713 neighbors.remove(neighbor);
714 }
715 _ => {}
716 }
717 event_sender.send(event).ok();
718 if !state.still_needed() {
719 self.quit_queue.push_back(topic_id);
720 }
721 }
722 OutEvent::ScheduleTimer(delay, timer) => {
723 self.timers.insert(now + delay, timer);
724 }
725 OutEvent::DisconnectPeer(peer_id) => {
726 debug!(peer=%peer_id.fmt_short(), "gossip state indicates disconnect: drop peer");
728 self.peers.remove(&peer_id);
729 }
730 OutEvent::PeerData(node_id, data) => match decode_peer_data(&data) {
731 Err(err) => warn!("Failed to decode {data:?} from {node_id}: {err}"),
732 Ok(info) => {
733 debug!(peer = ?node_id, "add known addrs: {info:?}");
734 let node_addr = NodeAddr {
735 node_id,
736 relay_url: info.relay_url,
737 direct_addresses: info.direct_addresses,
738 };
739 self.discovery.add(node_addr);
740 }
741 },
742 }
743 }
744 }
745}
746
747type ConnId = usize;
748
749#[derive(Debug)]
750enum PeerState {
751 Pending {
752 queue: Vec<ProtoMessage>,
753 },
754 Active {
755 active_send_tx: mpsc::Sender<ProtoMessage>,
756 active_conn_id: ConnId,
757 other_conns: Vec<ConnId>,
758 },
759}
760
761impl PeerState {
762 fn accept_conn(
763 &mut self,
764 send_tx: mpsc::Sender<ProtoMessage>,
765 conn_id: ConnId,
766 ) -> Vec<ProtoMessage> {
767 match self {
768 PeerState::Pending { queue } => {
769 let queue = std::mem::take(queue);
770 *self = PeerState::Active {
771 active_send_tx: send_tx,
772 active_conn_id: conn_id,
773 other_conns: Vec::new(),
774 };
775 queue
776 }
777 PeerState::Active {
778 active_send_tx,
779 active_conn_id,
780 other_conns,
781 } => {
782 other_conns.push(*active_conn_id);
788 *active_send_tx = send_tx;
789 *active_conn_id = conn_id;
790 Vec::new()
791 }
792 }
793 }
794}
795
796impl Default for PeerState {
797 fn default() -> Self {
798 PeerState::Pending { queue: Vec::new() }
799 }
800}
801
802#[derive(Debug)]
803struct TopicState {
804 neighbors: BTreeSet<NodeId>,
805 event_sender: broadcast::Sender<ProtoEvent>,
806 command_rx_keys: HashSet<stream_group::Key>,
810}
811
812impl Default for TopicState {
813 fn default() -> Self {
814 let (event_sender, _) = broadcast::channel(TOPIC_EVENT_CAP);
815 Self {
816 neighbors: Default::default(),
817 command_rx_keys: Default::default(),
818 event_sender,
819 }
820 }
821}
822
823impl TopicState {
824 fn still_needed(&self) -> bool {
826 !self.command_rx_keys.is_empty() && self.event_sender.receiver_count() > 0
827 }
828
829 #[cfg(test)]
830 fn joined(&self) -> bool {
831 !self.neighbors.is_empty()
832 }
833}
834
835#[derive(Debug, Clone, Copy, PartialEq, Eq)]
837enum ConnOrigin {
838 Accept,
839 Dial,
840}
841
842#[allow(missing_docs)]
843#[common_fields({
844 backtrace: Option<snafu::Backtrace>,
845})]
846#[derive(Debug, Snafu)]
847#[snafu(module)]
848#[non_exhaustive]
849enum ConnectionLoopError {
850 #[snafu(transparent)]
851 Write {
852 source: self::util::WriteError,
853 },
854 #[snafu(transparent)]
855 Read {
856 source: self::util::ReadError,
857 },
858 #[snafu(transparent)]
859 Connection {
860 source: iroh::endpoint::ConnectionError,
861 },
862 ActorDropped {},
863}
864
865impl<T> From<mpsc::error::SendError<T>> for ConnectionLoopError {
866 fn from(_value: mpsc::error::SendError<T>) -> Self {
867 self::connection_loop_error::ActorDroppedSnafu.build()
868 }
869}
870
871async fn connection_loop(
872 from: PublicKey,
873 conn: Connection,
874 origin: ConnOrigin,
875 send_rx: mpsc::Receiver<ProtoMessage>,
876 in_event_tx: mpsc::Sender<InEvent>,
877 max_message_size: usize,
878 queue: Vec<ProtoMessage>,
879) -> Result<(), ConnectionLoopError> {
880 debug!(?origin, "connection established");
881
882 let mut send_loop = SendLoop::new(conn.clone(), send_rx, max_message_size);
883 let mut recv_loop = RecvLoop::new(from, conn, in_event_tx, max_message_size);
884
885 let send_fut = send_loop.run(queue).instrument(error_span!("send"));
886 let recv_fut = recv_loop.run().instrument(error_span!("recv"));
887
888 let (send_res, recv_res) = tokio::join!(send_fut, recv_fut);
889 send_res?;
890 recv_res?;
891 Ok(())
892}
893
894#[derive(Default, Debug, Clone, Serialize, Deserialize)]
895struct AddrInfo {
896 relay_url: Option<RelayUrl>,
897 direct_addresses: BTreeSet<SocketAddr>,
898}
899
900impl From<NodeAddr> for AddrInfo {
901 fn from(
902 NodeAddr {
903 relay_url,
904 direct_addresses,
905 ..
906 }: NodeAddr,
907 ) -> Self {
908 Self {
909 relay_url,
910 direct_addresses,
911 }
912 }
913}
914
915fn encode_peer_data(info: &AddrInfo) -> PeerData {
916 let bytes = postcard::to_stdvec(info).expect("serializing AddrInfo may not fail");
917 PeerData::new(bytes)
918}
919
920fn decode_peer_data(peer_data: &PeerData) -> Result<AddrInfo, postcard::Error> {
921 let bytes = peer_data.as_bytes();
922 if bytes.is_empty() {
923 return Ok(AddrInfo::default());
924 }
925 let info = postcard::from_bytes(bytes)?;
926 Ok(info)
927}
928
929async fn topic_subscriber_loop(
930 sender: irpc::channel::mpsc::Sender<Event>,
931 mut topic_events: broadcast::Receiver<ProtoEvent>,
932) {
933 loop {
934 tokio::select! {
935 biased;
936 msg = topic_events.recv() => {
937 let event = match msg {
938 Err(broadcast::error::RecvError::Closed) => break,
939 Err(broadcast::error::RecvError::Lagged(_)) => Event::Lagged,
940 Ok(event) => event.into(),
941 };
942 if sender.send(event).await.is_err() {
943 break;
944 }
945 }
946 _ = sender.closed() => break,
947 }
948 }
949}
950
951type BoxedCommandReceiver =
953 n0_future::stream::Boxed<Result<Command, irpc::channel::mpsc::RecvError>>;
954
955#[derive(derive_more::Debug)]
956struct TopicCommandStream {
957 topic_id: TopicId,
958 #[debug("CommandStream")]
959 stream: BoxedCommandReceiver,
960 closed: bool,
961}
962
963impl TopicCommandStream {
964 fn new(topic_id: TopicId, stream: BoxedCommandReceiver) -> Self {
965 Self {
966 topic_id,
967 stream,
968 closed: false,
969 }
970 }
971}
972
973impl Stream for TopicCommandStream {
974 type Item = (TopicId, Option<Command>);
975 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
976 if self.closed {
977 return Poll::Ready(None);
978 }
979 match Pin::new(&mut self.stream).poll_next(cx) {
980 Poll::Ready(Some(Ok(item))) => Poll::Ready(Some((self.topic_id, Some(item)))),
981 Poll::Ready(None) | Poll::Ready(Some(Err(_))) => {
982 self.closed = true;
983 Poll::Ready(Some((self.topic_id, None)))
984 }
985 Poll::Pending => Poll::Pending,
986 }
987 }
988}
989
990#[derive(Debug)]
991struct Dialer {
992 endpoint: Endpoint,
993 pending: JoinSet<(
994 NodeId,
995 Option<Result<Connection, iroh::endpoint::ConnectError>>,
996 )>,
997 pending_dials: HashMap<NodeId, CancellationToken>,
998}
999
1000impl Dialer {
1001 fn new(endpoint: Endpoint) -> Self {
1003 Self {
1004 endpoint,
1005 pending: Default::default(),
1006 pending_dials: Default::default(),
1007 }
1008 }
1009
1010 fn queue_dial(&mut self, node_id: NodeId, alpn: Bytes) {
1012 if self.is_pending(node_id) {
1013 return;
1014 }
1015 let cancel = CancellationToken::new();
1016 self.pending_dials.insert(node_id, cancel.clone());
1017 let endpoint = self.endpoint.clone();
1018 self.pending.spawn(
1019 async move {
1020 let res = tokio::select! {
1021 biased;
1022 _ = cancel.cancelled() => None,
1023 res = endpoint.connect(node_id, &alpn) => Some(res),
1024 };
1025 (node_id, res)
1026 }
1027 .instrument(tracing::Span::current()),
1028 );
1029 }
1030
1031 fn is_pending(&self, node: NodeId) -> bool {
1033 self.pending_dials.contains_key(&node)
1034 }
1035
1036 async fn next_conn(
1039 &mut self,
1040 ) -> (
1041 NodeId,
1042 Option<Result<Connection, iroh::endpoint::ConnectError>>,
1043 ) {
1044 match self.pending_dials.is_empty() {
1045 false => {
1046 let (node_id, res) = loop {
1047 match self.pending.join_next().await {
1048 Some(Ok((node_id, res))) => {
1049 self.pending_dials.remove(&node_id);
1050 break (node_id, res);
1051 }
1052 Some(Err(e)) => {
1053 error!("next conn error: {:?}", e);
1054 }
1055 None => {
1056 error!("no more pending conns available");
1057 std::future::pending().await
1058 }
1059 }
1060 };
1061
1062 (node_id, res)
1063 }
1064 true => std::future::pending().await,
1065 }
1066 }
1067}
1068
1069#[cfg(test)]
1070pub(crate) mod test {
1071 use std::time::Duration;
1072
1073 use bytes::Bytes;
1074 use futures_concurrency::future::TryJoin;
1075 use iroh::{
1076 discovery::static_provider::StaticProvider, endpoint::BindError, protocol::Router,
1077 RelayMap, RelayMode, SecretKey,
1078 };
1079 use n0_snafu::{Result, ResultExt};
1080 use rand::{CryptoRng, Rng};
1081 use tokio::{spawn, time::timeout};
1082 use tokio_util::sync::CancellationToken;
1083 use tracing::{info, instrument};
1084 use tracing_test::traced_test;
1085
1086 use super::*;
1087 use crate::api::{ApiError, GossipReceiver, GossipSender};
1088
1089 struct ManualActorLoop {
1090 actor: Actor,
1091 step: usize,
1092 }
1093
1094 impl std::ops::Deref for ManualActorLoop {
1095 type Target = Actor;
1096
1097 fn deref(&self) -> &Self::Target {
1098 &self.actor
1099 }
1100 }
1101
1102 impl std::ops::DerefMut for ManualActorLoop {
1103 fn deref_mut(&mut self) -> &mut Self::Target {
1104 &mut self.actor
1105 }
1106 }
1107
1108 type EndpointHandle = tokio::task::JoinHandle<Result<()>>;
1109
1110 impl ManualActorLoop {
1111 #[instrument(skip_all, fields(me = %actor.endpoint.node_id().fmt_short()))]
1112 async fn new(mut actor: Actor) -> Self {
1113 let _ = actor.setup().await;
1114 Self { actor, step: 0 }
1115 }
1116
1117 #[instrument(skip_all, fields(me = %self.endpoint.node_id().fmt_short()))]
1118 async fn step(&mut self) -> bool {
1119 let ManualActorLoop { actor, step } = self;
1120 *step += 1;
1121 let addr_update_stream = &mut futures_lite::stream::pending();
1124 actor.event_loop(addr_update_stream, *step).await
1125 }
1126
1127 async fn steps(&mut self, n: usize) {
1128 for _ in 0..n {
1129 self.step().await;
1130 }
1131 }
1132
1133 async fn finish(mut self) {
1134 while self.step().await {}
1135 }
1136 }
1137
1138 impl Gossip {
1139 async fn t_new_with_actor(
1146 rng: &mut rand_chacha::ChaCha12Rng,
1147 config: proto::Config,
1148 relay_map: RelayMap,
1149 cancel: &CancellationToken,
1150 ) -> Result<(Self, Actor, EndpointHandle), BindError> {
1151 let endpoint = create_endpoint(rng, relay_map, None).await?;
1152 let metrics = Arc::new(Metrics::default());
1153 let discovery = GossipDiscovery::default();
1154 endpoint.discovery().add(discovery.clone());
1155
1156 let (actor, to_actor_tx, conn_tx) =
1157 Actor::new(endpoint, config, metrics.clone(), None, discovery);
1158 let max_message_size = actor.state.max_message_size();
1159
1160 let _actor_handle =
1161 AbortOnDropHandle::new(task::spawn(futures_lite::future::pending()));
1162 let gossip = Self {
1163 inner: Inner {
1164 api: GossipApi::local(to_actor_tx),
1165 local_tx: conn_tx,
1166 _actor_handle,
1167 max_message_size,
1168 metrics,
1169 }
1170 .into(),
1171 };
1172
1173 let endpoint_task = task::spawn(endpoint_loop(
1174 actor.endpoint.clone(),
1175 gossip.clone(),
1176 cancel.child_token(),
1177 ));
1178
1179 Ok((gossip, actor, endpoint_task))
1180 }
1181
1182 async fn t_new(
1184 rng: &mut rand_chacha::ChaCha12Rng,
1185 config: proto::Config,
1186 relay_map: RelayMap,
1187 cancel: &CancellationToken,
1188 ) -> Result<(Self, Endpoint, EndpointHandle, impl Drop + use<>), BindError> {
1189 let (g, actor, ep_handle) =
1190 Gossip::t_new_with_actor(rng, config, relay_map, cancel).await?;
1191 let ep = actor.endpoint.clone();
1192 let me = ep.node_id().fmt_short();
1193 let actor_handle =
1194 task::spawn(actor.run().instrument(tracing::error_span!("gossip", %me)));
1195 Ok((g, ep, ep_handle, AbortOnDropHandle::new(actor_handle)))
1196 }
1197 }
1198
1199 pub(crate) async fn create_endpoint(
1200 rng: &mut rand_chacha::ChaCha12Rng,
1201 relay_map: RelayMap,
1202 static_provider: Option<StaticProvider>,
1203 ) -> Result<Endpoint, BindError> {
1204 let ep = Endpoint::builder()
1205 .secret_key(SecretKey::generate(rng))
1206 .alpns(vec![GOSSIP_ALPN.to_vec()])
1207 .relay_mode(RelayMode::Custom(relay_map))
1208 .insecure_skip_relay_cert_verify(true)
1209 .bind()
1210 .await?;
1211
1212 if let Some(static_provider) = static_provider {
1213 ep.discovery().add(static_provider);
1214 }
1215 ep.online().await;
1216 Ok(ep)
1217 }
1218
1219 async fn endpoint_loop(
1220 endpoint: Endpoint,
1221 gossip: Gossip,
1222 cancel: CancellationToken,
1223 ) -> Result<()> {
1224 loop {
1225 tokio::select! {
1226 biased;
1227 _ = cancel.cancelled() => break,
1228 incoming = endpoint.accept() => match incoming {
1229 None => break,
1230 Some(incoming) => {
1231 let connecting = match incoming.accept() {
1232 Ok(connecting) => connecting,
1233 Err(err) => {
1234 warn!("incoming connection failed: {err:#}");
1235 continue;
1238 }
1239 };
1240 gossip.handle_connection(connecting.await.e()?).await?
1241 }
1242 }
1243 }
1244 }
1245 Ok(())
1246 }
1247
1248 #[tokio::test]
1249 #[traced_test]
1250 async fn gossip_net_smoke() {
1251 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1252 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1253
1254 let static_provider = StaticProvider::new();
1255
1256 let ep1 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1257 .await
1258 .unwrap();
1259 let ep2 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1260 .await
1261 .unwrap();
1262 let ep3 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1263 .await
1264 .unwrap();
1265
1266 let go1 = Gossip::builder().spawn(ep1.clone());
1267 let go2 = Gossip::builder().spawn(ep2.clone());
1268 let go3 = Gossip::builder().spawn(ep3.clone());
1269 debug!("peer1 {:?}", ep1.node_id());
1270 debug!("peer2 {:?}", ep2.node_id());
1271 debug!("peer3 {:?}", ep3.node_id());
1272 let pi1 = ep1.node_id();
1273 let pi2 = ep2.node_id();
1274
1275 let cancel = CancellationToken::new();
1276 let tasks = [
1277 spawn(endpoint_loop(ep1.clone(), go1.clone(), cancel.clone())),
1278 spawn(endpoint_loop(ep2.clone(), go2.clone(), cancel.clone())),
1279 spawn(endpoint_loop(ep3.clone(), go3.clone(), cancel.clone())),
1280 ];
1281
1282 debug!("----- adding peers ----- ");
1283 let topic: TopicId = blake3::hash(b"foobar").into();
1284
1285 let addr1 = NodeAddr::new(pi1).with_relay_url(relay_url.clone());
1286 let addr2 = NodeAddr::new(pi2).with_relay_url(relay_url);
1287 static_provider.add_node_info(addr1.clone());
1288 static_provider.add_node_info(addr2.clone());
1289
1290 debug!("----- joining ----- ");
1291 let [sub1, mut sub2, mut sub3] = [
1293 go1.subscribe_and_join(topic, vec![]),
1294 go2.subscribe_and_join(topic, vec![pi1]),
1295 go3.subscribe_and_join(topic, vec![pi2]),
1296 ]
1297 .try_join()
1298 .await
1299 .unwrap();
1300
1301 let (sink1, _stream1) = sub1.split();
1302
1303 let len = 2;
1304
1305 let pub1 = spawn(async move {
1307 for i in 0..len {
1308 let message = format!("hi{i}");
1309 info!("go1 broadcast: {message:?}");
1310 sink1.broadcast(message.into_bytes().into()).await.unwrap();
1311 tokio::time::sleep(Duration::from_micros(1)).await;
1312 }
1313 });
1314
1315 let sub2 = spawn(async move {
1317 let mut recv = vec![];
1318 loop {
1319 let ev = sub2.next().await.unwrap().unwrap();
1320 info!("go2 event: {ev:?}");
1321 if let Event::Received(msg) = ev {
1322 recv.push(msg.content);
1323 }
1324 if recv.len() == len {
1325 return recv;
1326 }
1327 }
1328 });
1329
1330 let sub3 = spawn(async move {
1332 let mut recv = vec![];
1333 loop {
1334 let ev = sub3.next().await.unwrap().unwrap();
1335 info!("go3 event: {ev:?}");
1336 if let Event::Received(msg) = ev {
1337 recv.push(msg.content);
1338 }
1339 if recv.len() == len {
1340 return recv;
1341 }
1342 }
1343 });
1344
1345 timeout(Duration::from_secs(10), pub1)
1346 .await
1347 .unwrap()
1348 .unwrap();
1349 let recv2 = timeout(Duration::from_secs(10), sub2)
1350 .await
1351 .unwrap()
1352 .unwrap();
1353 let recv3 = timeout(Duration::from_secs(10), sub3)
1354 .await
1355 .unwrap()
1356 .unwrap();
1357
1358 let expected: Vec<Bytes> = (0..len)
1359 .map(|i| Bytes::from(format!("hi{i}").into_bytes()))
1360 .collect();
1361 assert_eq!(recv2, expected);
1362 assert_eq!(recv3, expected);
1363
1364 cancel.cancel();
1365 for t in tasks {
1366 timeout(Duration::from_secs(10), t)
1367 .await
1368 .unwrap()
1369 .unwrap()
1370 .unwrap();
1371 }
1372 }
1373
1374 #[tokio::test]
1384 #[traced_test]
1385 async fn subscription_cleanup() -> Result {
1386 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1387 let ct = CancellationToken::new();
1388 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1389
1390 let (go1, actor, ep1_handle) =
1392 Gossip::t_new_with_actor(rng, Default::default(), relay_map.clone(), &ct).await?;
1393 let mut actor = ManualActorLoop::new(actor).await;
1394
1395 let (go2, ep2, ep2_handle, _test_actor_handle) =
1397 Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1398
1399 let node_id1 = actor.endpoint.node_id();
1400 let node_id2 = ep2.node_id();
1401 tracing::info!(
1402 node_1 = %node_id1.fmt_short(),
1403 node_2 = %node_id2.fmt_short(),
1404 "nodes ready"
1405 );
1406
1407 let topic: TopicId = blake3::hash(b"subscription_cleanup").into();
1408 tracing::info!(%topic, "joining");
1409
1410 let ct2 = ct.clone();
1417 let go2_task = async move {
1418 let (_pub_tx, mut sub_rx) = go2.subscribe_and_join(topic, vec![]).await?.split();
1419
1420 let subscribe_fut = async {
1421 while let Some(ev) = sub_rx.try_next().await? {
1422 match ev {
1423 Event::Lagged => tracing::debug!("missed some messages :("),
1424 Event::Received(_) => unreachable!("test does not send messages"),
1425 other => tracing::debug!(?other, "gs event"),
1426 }
1427 }
1428
1429 tracing::debug!("subscribe stream ended");
1430 Result::<_, n0_snafu::Error>::Ok(())
1431 };
1432
1433 tokio::select! {
1434 _ = ct2.cancelled() => Ok(()),
1435 res = subscribe_fut => res,
1436 }
1437 }
1438 .instrument(tracing::debug_span!("node_2", %node_id2));
1439 let go2_handle = task::spawn(go2_task);
1440
1441 let addr2 = NodeAddr::new(node_id2).with_relay_url(relay_url);
1443 let static_provider = StaticProvider::new();
1444 static_provider.add_node_info(addr2);
1445 actor.endpoint.discovery().add(static_provider);
1446 let (tx, mut rx) = mpsc::channel::<()>(1);
1448 let ct1 = ct.clone();
1449 let go1_task = async move {
1450 tracing::info!("subscribing the first time");
1452 let sub_1a = go1.subscribe_and_join(topic, vec![node_id2]).await?;
1453
1454 rx.recv().await.expect("signal for second subscribe");
1456 tracing::info!("subscribing a second time");
1457 let sub_1b = go1.subscribe_and_join(topic, vec![node_id2]).await?;
1458 drop(sub_1a);
1459
1460 rx.recv().await.expect("signal for second subscribe");
1462 tracing::info!("dropping all handles");
1463 drop(sub_1b);
1464
1465 ct1.cancelled().await;
1467 drop(go1);
1468
1469 Result::<_, n0_snafu::Error>::Ok(())
1470 }
1471 .instrument(tracing::debug_span!("node_1", %node_id1));
1472 let go1_handle = task::spawn(go1_task);
1473
1474 actor.steps(3).await; let state = actor.topics.get(&topic).expect("get registered topic");
1479 assert!(state.joined());
1480
1481 tx.send(()).await.e()?;
1483 actor.steps(3).await; let state = actor.topics.get(&topic).expect("get registered topic");
1485 assert!(state.joined());
1486
1487 tx.send(()).await.e()?;
1489 actor.steps(2).await; assert!(!actor.topics.contains_key(&topic));
1491
1492 ct.cancel();
1494 let wait = Duration::from_secs(2);
1495 timeout(wait, ep1_handle).await.e()?.e()??;
1496 timeout(wait, ep2_handle).await.e()?.e()??;
1497 timeout(wait, go1_handle).await.e()?.e()??;
1498 timeout(wait, go2_handle).await.e()?.e()??;
1499 timeout(wait, actor.finish()).await.e()?;
1500
1501 Ok(())
1502 }
1503
1504 #[tokio::test]
1511 #[traced_test]
1512 async fn can_reconnect() -> Result {
1513 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1514 let ct = CancellationToken::new();
1515 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1516
1517 let (go1, ep1, ep1_handle, _test_actor_handle1) =
1518 Gossip::t_new(rng, Default::default(), relay_map.clone(), &ct).await?;
1519
1520 let (go2, ep2, ep2_handle, _test_actor_handle2) =
1521 Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1522
1523 let node_id1 = ep1.node_id();
1524 let node_id2 = ep2.node_id();
1525 tracing::info!(
1526 node_1 = %node_id1.fmt_short(),
1527 node_2 = %node_id2.fmt_short(),
1528 "nodes ready"
1529 );
1530
1531 let topic: TopicId = blake3::hash(b"can_reconnect").into();
1532 tracing::info!(%topic, "joining");
1533
1534 let ct2 = ct.child_token();
1535 let (tx, mut rx) = mpsc::channel::<()>(1);
1537 let addr1 = NodeAddr::new(node_id1).with_relay_url(relay_url.clone());
1538 let static_provider = StaticProvider::new();
1539 static_provider.add_node_info(addr1);
1540 ep2.discovery().add(static_provider.clone());
1541 let go2_task = async move {
1542 let mut sub = go2.subscribe(topic, Vec::new()).await?;
1543 sub.joined().await?;
1544
1545 rx.recv().await.expect("signal to unsubscribe");
1546 tracing::info!("unsubscribing");
1547 drop(sub);
1548
1549 rx.recv().await.expect("signal to subscribe again");
1550 tracing::info!("resubscribing");
1551 let mut sub = go2.subscribe(topic, vec![node_id1]).await?;
1552
1553 sub.joined().await?;
1554 tracing::info!("subscription successful!");
1555
1556 ct2.cancelled().await;
1557
1558 Result::<_, ApiError>::Ok(())
1559 }
1560 .instrument(tracing::debug_span!("node_2", %node_id2));
1561 let go2_handle = task::spawn(go2_task);
1562
1563 let addr2 = NodeAddr::new(node_id2).with_relay_url(relay_url);
1564 static_provider.add_node_info(addr2);
1565 ep1.discovery().add(static_provider);
1566
1567 let mut sub = go1.subscribe(topic, vec![node_id2]).await?;
1568 sub.joined().await?;
1570
1571 tx.send(()).await.e()?;
1573
1574 let conn_timeout = Duration::from_millis(500);
1576 let ev = timeout(conn_timeout, sub.try_next()).await.e()??;
1577 assert_eq!(ev, Some(Event::NeighborDown(node_id2)));
1578 tracing::info!("node 2 left");
1579
1580 tx.send(()).await.e()?;
1582
1583 let conn_timeout = Duration::from_millis(500);
1584 let ev = timeout(conn_timeout, sub.try_next()).await.e()??;
1585 assert_eq!(ev, Some(Event::NeighborUp(node_id2)));
1586 tracing::info!("node 2 rejoined!");
1587
1588 ct.cancel();
1590 let wait = Duration::from_secs(2);
1591 timeout(wait, ep1_handle).await.e()?.e()??;
1592 timeout(wait, ep2_handle).await.e()?.e()??;
1593 timeout(wait, go2_handle).await.e()?.e()??;
1594
1595 Result::Ok(())
1596 }
1597
1598 #[tokio::test]
1599 #[traced_test]
1600 async fn can_die_and_reconnect() -> Result {
1601 fn run_in_thread<T: Send + 'static>(
1604 cancel: CancellationToken,
1605 fut: impl std::future::Future<Output = T> + Send + 'static,
1606 ) -> std::thread::JoinHandle<Option<T>> {
1607 std::thread::spawn(move || {
1608 let rt = tokio::runtime::Builder::new_current_thread()
1609 .enable_all()
1610 .build()
1611 .unwrap();
1612 rt.block_on(async move { cancel.run_until_cancelled(fut).await })
1613 })
1614 }
1615
1616 async fn spawn_gossip(
1618 secret_key: SecretKey,
1619 relay_map: RelayMap,
1620 ) -> Result<(Router, Gossip), BindError> {
1621 let ep = Endpoint::builder()
1622 .secret_key(secret_key)
1623 .relay_mode(RelayMode::Custom(relay_map))
1624 .insecure_skip_relay_cert_verify(true)
1625 .bind()
1626 .await?;
1627 let gossip = Gossip::builder().spawn(ep.clone());
1628 let router = Router::builder(ep)
1629 .accept(GOSSIP_ALPN, gossip.clone())
1630 .spawn();
1631 Ok((router, gossip))
1632 }
1633
1634 async fn broadcast_once(
1636 secret_key: SecretKey,
1637 relay_map: RelayMap,
1638 bootstrap_addr: NodeAddr,
1639 topic_id: TopicId,
1640 message: String,
1641 ) -> Result {
1642 let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1643 info!(node_id = %router.endpoint().node_id().fmt_short(), "broadcast node spawned");
1644 let bootstrap = vec![bootstrap_addr.node_id];
1645 let static_provider = StaticProvider::new();
1646 static_provider.add_node_info(bootstrap_addr);
1647 router.endpoint().discovery().add(static_provider);
1648 let mut topic = gossip.subscribe_and_join(topic_id, bootstrap).await?;
1649 topic.broadcast(message.as_bytes().to_vec().into()).await?;
1650 std::future::pending::<()>().await;
1651 Ok(())
1652 }
1653
1654 let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1655 let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1656 let topic_id = TopicId::from_bytes(rng.random());
1657
1658 let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
1661 let (msgs_recv_tx, mut msgs_recv_rx) = tokio::sync::mpsc::channel(3);
1662 let recv_task = tokio::task::spawn({
1663 let relay_map = relay_map.clone();
1664 let secret_key = SecretKey::generate(&mut rng);
1665 async move {
1666 let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1667 router.endpoint().online().await;
1672 let addr = router.endpoint().node_addr();
1673 info!(node_id = %addr.node_id.fmt_short(), "recv node spawned");
1674 addr_tx.send(addr).unwrap();
1675 let mut topic = gossip.subscribe_and_join(topic_id, vec![]).await?;
1676 while let Some(event) = topic.try_next().await.unwrap() {
1677 if let Event::Received(message) = event {
1678 let message = std::str::from_utf8(&message.content).e()?.to_string();
1679 msgs_recv_tx.send(message).await.e()?;
1680 }
1681 }
1682 Result::<_, n0_snafu::Error>::Ok(())
1683 }
1684 });
1685
1686 let node0_addr = addr_rx.await.e()?;
1687 let max_wait = Duration::from_secs(5);
1688
1689 let cancel = CancellationToken::new();
1692 let secret = SecretKey::generate(&mut rng);
1693 let join_handle_1 = run_in_thread(
1694 cancel.clone(),
1695 broadcast_once(
1696 secret.clone(),
1697 relay_map.clone(),
1698 node0_addr.clone(),
1699 topic_id,
1700 "msg1".to_string(),
1701 ),
1702 );
1703 let msg = timeout(max_wait, msgs_recv_rx.recv()).await.e()?.unwrap();
1705 assert_eq!(&msg, "msg1");
1706 info!("kill broadcast node");
1707 cancel.cancel();
1708
1709 let cancel = CancellationToken::new();
1711 let join_handle_2 = run_in_thread(
1712 cancel.clone(),
1713 broadcast_once(
1714 secret.clone(),
1715 relay_map.clone(),
1716 node0_addr.clone(),
1717 topic_id,
1718 "msg2".to_string(),
1719 ),
1720 );
1721 let msg = timeout(max_wait, msgs_recv_rx.recv()).await.e()?.unwrap();
1724 assert_eq!(&msg, "msg2");
1725 info!("kill broadcast node");
1726 cancel.cancel();
1727
1728 info!("kill recv node");
1729 recv_task.abort();
1730 assert!(join_handle_1.join().unwrap().is_none());
1731 assert!(join_handle_2.join().unwrap().is_none());
1732
1733 Ok(())
1734 }
1735
1736 #[tokio::test]
1737 #[traced_test]
1738 async fn gossip_change_alpn() -> n0_snafu::Result<()> {
1739 let alpn = b"my-gossip-alpn";
1740 let topic_id = TopicId::from([0u8; 32]);
1741
1742 let ep1 = Endpoint::builder().bind().await?;
1743 let ep2 = Endpoint::builder().bind().await?;
1744 let gossip1 = Gossip::builder().alpn(alpn).spawn(ep1.clone());
1745 let gossip2 = Gossip::builder().alpn(alpn).spawn(ep2.clone());
1746 let router1 = Router::builder(ep1).accept(alpn, gossip1.clone()).spawn();
1747 let router2 = Router::builder(ep2).accept(alpn, gossip2.clone()).spawn();
1748
1749 let addr1 = router1.endpoint().node_addr();
1750 let id1 = addr1.node_id;
1751 let static_provider = StaticProvider::new();
1752 static_provider.add_node_info(addr1);
1753 router2.endpoint().discovery().add(static_provider);
1754
1755 let mut topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1756 let mut topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1757
1758 timeout(Duration::from_secs(3), topic1.joined())
1759 .await
1760 .e()??;
1761 timeout(Duration::from_secs(3), topic2.joined())
1762 .await
1763 .e()??;
1764 router1.shutdown().await.e()?;
1765 router2.shutdown().await.e()?;
1766 Ok(())
1767 }
1768
1769 #[tokio::test]
1770 #[traced_test]
1771 async fn gossip_rely_on_gossip_discovery() -> n0_snafu::Result<()> {
1772 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1773
1774 async fn spawn(
1775 rng: &mut impl CryptoRng,
1776 ) -> n0_snafu::Result<(NodeId, Router, Gossip, GossipSender, GossipReceiver)> {
1777 let topic_id = TopicId::from([0u8; 32]);
1778 let ep = Endpoint::builder()
1779 .secret_key(SecretKey::generate(rng))
1780 .relay_mode(RelayMode::Disabled)
1781 .bind()
1782 .await?;
1783 let node_id = ep.node_id();
1784 let gossip = Gossip::builder().spawn(ep.clone());
1785 let router = Router::builder(ep)
1786 .accept(GOSSIP_ALPN, gossip.clone())
1787 .spawn();
1788 let topic = gossip.subscribe(topic_id, vec![]).await?;
1789 let (sender, receiver) = topic.split();
1790 Ok((node_id, router, gossip, sender, receiver))
1791 }
1792
1793 let (n1, r1, _g1, _tx1, mut rx1) = spawn(rng).await?;
1795 let (n2, r2, _g2, tx2, mut rx2) = spawn(rng).await?;
1796 let (n3, r3, _g3, tx3, mut rx3) = spawn(rng).await?;
1797
1798 println!("nodes {:?}", [n1, n2, n3]);
1799
1800 let addr1 = r1.endpoint().node_addr();
1802 let disco = StaticProvider::new();
1803 disco.add_node_info(addr1);
1804
1805 r2.endpoint().discovery().add(disco.clone());
1807 tx2.join_peers(vec![n1]).await?;
1808
1809 timeout(Duration::from_secs(3), rx1.joined()).await.e()??;
1811 timeout(Duration::from_secs(3), rx2.joined()).await.e()??;
1812
1813 r3.endpoint().discovery().add(disco.clone());
1815 tx3.join_peers(vec![n1]).await?;
1816
1817 let ev = timeout(Duration::from_secs(3), rx3.next()).await.e()?;
1820 assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1821 let ev = timeout(Duration::from_secs(3), rx3.next()).await.e()?;
1822 assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1823
1824 assert_eq!(sorted(rx3.neighbors()), sorted([n1, n2]));
1825
1826 let ev = timeout(Duration::from_secs(3), rx2.next()).await.e()?;
1827 assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1828
1829 let ev = timeout(Duration::from_secs(3), rx1.next()).await.e()?;
1830 assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1831
1832 tokio::try_join!(r1.shutdown(), r2.shutdown(), r3.shutdown()).e()?;
1833 Ok(())
1834 }
1835
1836 fn sorted<T: Ord>(input: impl IntoIterator<Item = T>) -> Vec<T> {
1837 let mut out: Vec<_> = input.into_iter().collect();
1838 out.sort();
1839 out
1840 }
1841}