1use std::{
4 collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
5 net::SocketAddr,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_concurrency::stream::{stream_group, StreamGroup};
13use futures_util::FutureExt as _;
14use iroh::{
15 endpoint::Connection,
16 protocol::{AcceptError, ProtocolHandler},
17 Endpoint, EndpointAddr, EndpointId, PublicKey, RelayUrl, Watcher,
18};
19use irpc::WithChannels;
20use n0_error::{e, stack_error};
21use n0_future::{
22 task::{self, AbortOnDropHandle, JoinSet},
23 time::Instant,
24 Stream, StreamExt as _,
25};
26use rand::{rngs::StdRng, SeedableRng};
27use serde::{Deserialize, Serialize};
28use tokio::sync::{broadcast, mpsc, oneshot};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, error_span, trace, warn, Instrument};
31
32use self::{
33 discovery::GossipDiscovery,
34 util::{RecvLoop, SendLoop, Timers},
35};
36use crate::{
37 api::{self, Command, Event, GossipApi, RpcMessage},
38 metrics::Metrics,
39 proto::{self, HyparviewConfig, PeerData, PlumtreeConfig, Scope, TopicId},
40};
41
42mod discovery;
43mod util;
44
45pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/1";
47
48const SEND_QUEUE_CAP: usize = 64;
50const TO_ACTOR_CAP: usize = 64;
52const IN_EVENT_CAP: usize = 1024;
54const TOPIC_EVENT_CAP: usize = 256;
56
57pub type ProtoEvent = proto::Event<PublicKey>;
59pub type ProtoCommand = proto::Command<PublicKey>;
61
62type InEvent = proto::InEvent<PublicKey>;
63type OutEvent = proto::OutEvent<PublicKey>;
64type Timer = proto::Timer<PublicKey>;
65type ProtoMessage = proto::Message<PublicKey>;
66
67#[derive(Debug, Clone)]
86pub struct Gossip {
87 pub(crate) inner: Arc<Inner>,
88}
89
90impl std::ops::Deref for Gossip {
91 type Target = GossipApi;
92 fn deref(&self) -> &Self::Target {
93 &self.inner.api
94 }
95}
96
97#[derive(Debug)]
98enum LocalActorMessage {
99 HandleConnection(Connection),
100 Shutdown { reply: oneshot::Sender<()> },
101}
102
103#[allow(missing_docs)]
104#[stack_error(derive, add_meta)]
105#[non_exhaustive]
106pub enum Error {
107 ActorDropped {},
108}
109
110impl<T> From<mpsc::error::SendError<T>> for Error {
111 fn from(_value: mpsc::error::SendError<T>) -> Self {
112 e!(Error::ActorDropped)
113 }
114}
115impl From<oneshot::error::RecvError> for Error {
116 fn from(_value: oneshot::error::RecvError) -> Self {
117 e!(Error::ActorDropped)
118 }
119}
120
121#[derive(Debug)]
122pub(crate) struct Inner {
123 api: GossipApi,
124 local_tx: mpsc::Sender<LocalActorMessage>,
125 _actor_handle: AbortOnDropHandle<()>,
126 max_message_size: usize,
127 metrics: Arc<Metrics>,
128}
129
130impl ProtocolHandler for Gossip {
131 async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
132 self.handle_connection(connection)
133 .await
134 .map_err(AcceptError::from_err)?;
135 Ok(())
136 }
137
138 async fn shutdown(&self) {
139 if let Err(err) = self.shutdown().await {
140 warn!("error while shutting down gossip: {err:#}");
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
147pub struct Builder {
148 config: proto::Config,
149 alpn: Option<Bytes>,
150}
151
152impl Builder {
153 pub fn max_message_size(mut self, size: usize) -> Self {
156 self.config.max_message_size = size;
157 self
158 }
159
160 pub fn membership_config(mut self, config: HyparviewConfig) -> Self {
162 self.config.membership = config;
163 self
164 }
165
166 pub fn broadcast_config(mut self, config: PlumtreeConfig) -> Self {
168 self.config.broadcast = config;
169 self
170 }
171
172 pub fn alpn(mut self, alpn: impl AsRef<[u8]>) -> Self {
180 self.alpn = Some(alpn.as_ref().to_vec().into());
181 self
182 }
183
184 pub fn spawn(self, endpoint: Endpoint) -> Gossip {
186 let metrics = Arc::new(Metrics::default());
187 let discovery = GossipDiscovery::default();
188 endpoint.discovery().add(discovery.clone());
189 let (actor, rpc_tx, local_tx) =
190 Actor::new(endpoint, self.config, metrics.clone(), self.alpn, discovery);
191 let me = actor.endpoint.id().fmt_short();
192 let max_message_size = actor.state.max_message_size();
193
194 let actor_handle = task::spawn(actor.run().instrument(error_span!("gossip", %me)));
195
196 let api = GossipApi::local(rpc_tx);
197
198 Gossip {
199 inner: Inner {
200 api,
201 local_tx,
202 _actor_handle: AbortOnDropHandle::new(actor_handle),
203 max_message_size,
204 metrics,
205 }
206 .into(),
207 }
208 }
209}
210
211impl Gossip {
212 pub fn builder() -> Builder {
214 Builder {
215 config: Default::default(),
216 alpn: None,
217 }
218 }
219
220 #[cfg(feature = "rpc")]
222 pub async fn listen(self, endpoint: quinn::Endpoint) {
223 self.inner.api.listen(endpoint).await
224 }
225
226 pub fn max_message_size(&self) -> usize {
228 self.inner.max_message_size
229 }
230
231 pub async fn handle_connection(&self, conn: Connection) -> Result<(), Error> {
235 self.inner
236 .local_tx
237 .send(LocalActorMessage::HandleConnection(conn))
238 .await?;
239 Ok(())
240 }
241
242 pub async fn shutdown(&self) -> Result<(), Error> {
247 let (reply, reply_rx) = oneshot::channel();
248 self.inner
249 .local_tx
250 .send(LocalActorMessage::Shutdown { reply })
251 .await?;
252 reply_rx.await?;
253 Ok(())
254 }
255
256 pub fn metrics(&self) -> &Arc<Metrics> {
258 &self.inner.metrics
259 }
260}
261
262struct Actor {
264 alpn: Bytes,
265 state: proto::State<PublicKey, StdRng>,
267 endpoint: Endpoint,
269 dialer: Dialer,
271 rpc_rx: mpsc::Receiver<RpcMessage>,
273 local_rx: mpsc::Receiver<LocalActorMessage>,
274 in_event_tx: mpsc::Sender<InEvent>,
276 in_event_rx: mpsc::Receiver<InEvent>,
278 timers: Timers<Timer>,
280 topics: HashMap<TopicId, TopicState>,
282 peers: HashMap<EndpointId, PeerState>,
284 command_rx: stream_group::Keyed<TopicCommandStream>,
286 quit_queue: VecDeque<TopicId>,
288 connection_tasks: JoinSet<(EndpointId, Connection, Result<(), ConnectionLoopError>)>,
290 metrics: Arc<Metrics>,
291 topic_event_forwarders: JoinSet<TopicId>,
292 discovery: GossipDiscovery,
293}
294
295impl Actor {
296 fn new(
297 endpoint: Endpoint,
298 config: proto::Config,
299 metrics: Arc<Metrics>,
300 alpn: Option<Bytes>,
301 discovery: GossipDiscovery,
302 ) -> (
303 Self,
304 mpsc::Sender<RpcMessage>,
305 mpsc::Sender<LocalActorMessage>,
306 ) {
307 let peer_id = endpoint.id();
308 let dialer = Dialer::new(endpoint.clone());
309 let state = proto::State::new(
310 peer_id,
311 Default::default(),
312 config,
313 rand::rngs::StdRng::from_rng(&mut rand::rng()),
314 );
315 let (rpc_tx, rpc_rx) = mpsc::channel(TO_ACTOR_CAP);
316 let (local_tx, local_rx) = mpsc::channel(16);
317 let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
318
319 let actor = Actor {
320 alpn: alpn.unwrap_or_else(|| GOSSIP_ALPN.to_vec().into()),
321 endpoint,
322 state,
323 dialer,
324 rpc_rx,
325 in_event_rx,
326 in_event_tx,
327 timers: Timers::new(),
328 command_rx: StreamGroup::new().keyed(),
329 peers: Default::default(),
330 topics: Default::default(),
331 quit_queue: Default::default(),
332 connection_tasks: Default::default(),
333 metrics,
334 local_rx,
335 topic_event_forwarders: Default::default(),
336 discovery,
337 };
338
339 (actor, rpc_tx, local_tx)
340 }
341
342 pub async fn run(mut self) {
343 let mut addr_update_stream = self.setup().await;
344
345 let mut i = 0;
346 while self.event_loop(&mut addr_update_stream, i).await {
347 i += 1;
348 }
349 }
350
351 async fn setup(&mut self) -> impl Stream<Item = EndpointAddr> + Send + Unpin + use<> {
356 let addr_update_stream = self.endpoint.watch_addr().stream();
357 let initial_addr = self.endpoint.addr();
358 self.handle_addr_update(initial_addr).await;
359 addr_update_stream
360 }
361
362 async fn event_loop(
366 &mut self,
367 addr_updates: &mut (impl Stream<Item = EndpointAddr> + Send + Unpin),
368 i: usize,
369 ) -> bool {
370 self.metrics.actor_tick_main.inc();
371 tokio::select! {
372 biased;
373 conn = self.local_rx.recv() => {
374 match conn {
375 Some(LocalActorMessage::Shutdown { reply }) => {
376 debug!("received shutdown message, quit all topics");
377 self.quit_queue.extend(self.topics.keys().copied());
378 self.process_quit_queue().await;
379 debug!("all topics quit, stop gossip actor");
380 reply.send(()).ok();
381 return false;
382 },
383 Some(LocalActorMessage::HandleConnection(conn)) => {
384 self.handle_connection(conn.remote_id(), ConnOrigin::Accept, conn);
385 }
386 None => {
387 debug!("all gossip handles dropped, stop gossip actor");
388 return false;
389 }
390 }
391 }
392 msg = self.rpc_rx.recv() => {
393 trace!(?i, "tick: to_actor_rx");
394 self.metrics.actor_tick_rx.inc();
395 match msg {
396 Some(msg) => {
397 self.handle_rpc_msg(msg, Instant::now()).await;
398 }
399 None => {
400 debug!("all gossip handles dropped, stop gossip actor");
401 return false;
402 }
403 }
404 },
405 Some((key, (topic, command))) = self.command_rx.next(), if !self.command_rx.is_empty() => {
406 trace!(?i, "tick: command_rx");
407 self.handle_command(topic, key, command).await;
408 },
409 Some(new_address) = addr_updates.next() => {
410 trace!(?i, "tick: new_address");
411 self.metrics.actor_tick_endpoint.inc();
412 self.handle_addr_update(new_address).await;
413 }
414 (peer_id, res) = self.dialer.next_conn() => {
415 trace!(?i, "tick: dialer");
416 self.metrics.actor_tick_dialer.inc();
417 match res {
418 Some(Ok(conn)) => {
419 debug!(peer = %peer_id.fmt_short(), "dial successful");
420 self.metrics.actor_tick_dialer_success.inc();
421 self.handle_connection(peer_id, ConnOrigin::Dial, conn);
422 }
423 Some(Err(err)) => {
424 warn!(peer = %peer_id.fmt_short(), "dial failed: {err}");
425 self.metrics.actor_tick_dialer_failure.inc();
426 let peer_state = self.peers.get(&peer_id);
427 let is_active = matches!(peer_state, Some(PeerState::Active { .. }));
428 if !is_active {
429 self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
430 .await;
431 }
432 }
433 None => {
434 warn!(peer = %peer_id.fmt_short(), "dial disconnected");
435 self.metrics.actor_tick_dialer_failure.inc();
436 }
437 }
438 }
439 event = self.in_event_rx.recv() => {
440 trace!(?i, "tick: in_event_rx");
441 self.metrics.actor_tick_in_event_rx.inc();
442 let event = event.expect("unreachable: in_event_tx is never dropped before receiver");
443 self.handle_in_event(event, Instant::now()).await;
444 }
445 _ = self.timers.wait_next() => {
446 trace!(?i, "tick: timers");
447 self.metrics.actor_tick_timers.inc();
448 let now = Instant::now();
449 while let Some((_instant, timer)) = self.timers.pop_before(now) {
450 self.handle_in_event(InEvent::TimerExpired(timer), now).await;
451 }
452 }
453 Some(res) = self.connection_tasks.join_next(), if !self.connection_tasks.is_empty() => {
454 trace!(?i, "tick: connection_tasks");
455 let (peer_id, conn, result) = res.expect("connection task panicked");
456 self.handle_connection_task_finished(peer_id, conn, result).await;
457 }
458 Some(res) = self.topic_event_forwarders.join_next(), if !self.topic_event_forwarders.is_empty() => {
459 let topic_id = res.expect("topic event forwarder panicked");
460 if let Some(state) = self.topics.get_mut(&topic_id) {
461 if !state.still_needed() {
462 self.quit_queue.push_back(topic_id);
463 self.process_quit_queue().await;
464 }
465 }
466 }
467 }
468
469 true
470 }
471
472 async fn handle_addr_update(&mut self, endpoint_addr: EndpointAddr) {
473 let peer_data = encode_peer_data(&endpoint_addr.into());
475 self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now())
476 .await
477 }
478
479 async fn handle_command(
480 &mut self,
481 topic: TopicId,
482 key: stream_group::Key,
483 command: Option<Command>,
484 ) {
485 debug!(?topic, ?key, ?command, "handle command");
486 let Some(state) = self.topics.get_mut(&topic) else {
487 warn!("received command for unknown topic");
489 return;
490 };
491 match command {
492 Some(command) => {
493 let command = match command {
494 Command::Broadcast(message) => ProtoCommand::Broadcast(message, Scope::Swarm),
495 Command::BroadcastNeighbors(message) => {
496 ProtoCommand::Broadcast(message, Scope::Neighbors)
497 }
498 Command::JoinPeers(peers) => ProtoCommand::Join(peers),
499 };
500 self.handle_in_event(proto::InEvent::Command(topic, command), Instant::now())
501 .await;
502 }
503 None => {
504 state.command_rx_keys.remove(&key);
505 if !state.still_needed() {
506 self.quit_queue.push_back(topic);
507 self.process_quit_queue().await;
508 }
509 }
510 }
511 }
512
513 fn handle_connection(&mut self, peer_id: EndpointId, origin: ConnOrigin, conn: Connection) {
514 let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
515 let conn_id = conn.stable_id();
516
517 let queue = match self.peers.entry(peer_id) {
518 Entry::Occupied(mut entry) => entry.get_mut().accept_conn(send_tx, conn_id),
519 Entry::Vacant(entry) => {
520 entry.insert(PeerState::Active {
521 active_send_tx: send_tx,
522 active_conn_id: conn_id,
523 other_conns: Vec::new(),
524 });
525 Vec::new()
526 }
527 };
528
529 let max_message_size = self.state.max_message_size();
530 let in_event_tx = self.in_event_tx.clone();
531
532 self.connection_tasks.spawn(
534 async move {
535 let res = connection_loop(
536 peer_id,
537 conn.clone(),
538 origin,
539 send_rx,
540 in_event_tx,
541 max_message_size,
542 queue,
543 )
544 .await;
545 (peer_id, conn, res)
546 }
547 .instrument(error_span!("conn", peer = %peer_id.fmt_short())),
548 );
549 }
550
551 #[tracing::instrument(name = "conn", skip_all, fields(peer = %peer_id.fmt_short()))]
552 async fn handle_connection_task_finished(
553 &mut self,
554 peer_id: EndpointId,
555 conn: Connection,
556 task_result: Result<(), ConnectionLoopError>,
557 ) {
558 if conn.close_reason().is_none() {
559 conn.close(0u32.into(), b"close from disconnect");
560 }
561 let reason = conn.close_reason().expect("just closed");
562 let error = task_result.err();
563 debug!(%reason, ?error, "connection closed");
564 if let Some(PeerState::Active {
565 active_conn_id,
566 other_conns,
567 ..
568 }) = self.peers.get_mut(&peer_id)
569 {
570 if conn.stable_id() == *active_conn_id {
571 debug!("active send connection closed, mark peer as disconnected");
572 self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
573 .await;
574 } else {
575 other_conns.retain(|x| *x != conn.stable_id());
576 debug!("remaining {} other connections", other_conns.len() + 1);
577 }
578 } else {
579 debug!("peer already marked as disconnected");
580 }
581 }
582
583 async fn handle_rpc_msg(&mut self, msg: RpcMessage, now: Instant) {
584 trace!("handle to_actor {msg:?}");
585 match msg {
586 RpcMessage::Join(msg) => {
587 let WithChannels {
588 inner,
589 rx,
590 tx,
591 span: _,
593 } = msg;
594 let api::JoinRequest {
595 topic_id,
596 bootstrap,
597 } = inner;
598 let TopicState {
599 neighbors,
600 event_sender,
601 command_rx_keys,
602 } = self.topics.entry(topic_id).or_default();
603 let mut sender_dead = false;
604 if !neighbors.is_empty() {
605 for neighbor in neighbors.iter() {
606 if let Err(_err) = tx.try_send(Event::NeighborUp(*neighbor)).await {
607 sender_dead = true;
608 break;
609 }
610 }
611 }
612
613 if !sender_dead {
614 let fut =
615 topic_subscriber_loop(tx, event_sender.subscribe()).map(move |_| topic_id);
616 self.topic_event_forwarders
617 .spawn(fut.instrument(tracing::Span::current()));
618 }
619 let command_rx = TopicCommandStream::new(topic_id, Box::pin(rx.into_stream()));
620 let key = self.command_rx.insert(command_rx);
621 command_rx_keys.insert(key);
622
623 self.handle_in_event(
624 InEvent::Command(
625 topic_id,
626 ProtoCommand::Join(bootstrap.into_iter().collect()),
627 ),
628 now,
629 )
630 .await;
631 }
632 }
633 }
634
635 async fn handle_in_event(&mut self, event: InEvent, now: Instant) {
636 self.handle_in_event_inner(event, now).await;
637 self.process_quit_queue().await;
638 }
639
640 async fn process_quit_queue(&mut self) {
641 while let Some(topic_id) = self.quit_queue.pop_front() {
642 self.handle_in_event_inner(
643 InEvent::Command(topic_id, ProtoCommand::Quit),
644 Instant::now(),
645 )
646 .await;
647 if self.topics.remove(&topic_id).is_some() {
648 tracing::debug!(%topic_id, "publishers and subscribers gone; unsubscribing");
649 }
650 }
651 }
652
653 async fn handle_in_event_inner(&mut self, event: InEvent, now: Instant) {
654 if matches!(event, InEvent::TimerExpired(_)) {
655 trace!(?event, "handle in_event");
656 } else {
657 debug!(?event, "handle in_event");
658 };
659 let out = self.state.handle(event, now, Some(&self.metrics));
660 for event in out {
661 if matches!(event, OutEvent::ScheduleTimer(_, _)) {
662 trace!(?event, "handle out_event");
663 } else {
664 debug!(?event, "handle out_event");
665 };
666 match event {
667 OutEvent::SendMessage(peer_id, message) => {
668 let state = self.peers.entry(peer_id).or_default();
669 match state {
670 PeerState::Active { active_send_tx, .. } => {
671 if let Err(_err) = active_send_tx.send(message).await {
672 warn!(
675 peer = %peer_id.fmt_short(),
676 "failed to send: connection task send loop terminated",
677 );
678 }
679 }
680 PeerState::Pending { queue } => {
681 if queue.is_empty() {
682 debug!(peer = %peer_id.fmt_short(), "start to dial");
683 self.dialer.queue_dial(peer_id, self.alpn.clone());
684 }
685 queue.push(message);
686 }
687 }
688 }
689 OutEvent::EmitEvent(topic_id, event) => {
690 let Some(state) = self.topics.get_mut(&topic_id) else {
691 warn!(?topic_id, "gossip state emitted event for unknown topic");
693 continue;
694 };
695 let TopicState {
696 neighbors,
697 event_sender,
698 ..
699 } = state;
700 match &event {
701 ProtoEvent::NeighborUp(neighbor) => {
702 neighbors.insert(*neighbor);
703 }
704 ProtoEvent::NeighborDown(neighbor) => {
705 neighbors.remove(neighbor);
706 }
707 _ => {}
708 }
709 event_sender.send(event).ok();
710 if !state.still_needed() {
711 self.quit_queue.push_back(topic_id);
712 }
713 }
714 OutEvent::ScheduleTimer(delay, timer) => {
715 self.timers.insert(now + delay, timer);
716 }
717 OutEvent::DisconnectPeer(peer_id) => {
718 debug!(peer=%peer_id.fmt_short(), "gossip state indicates disconnect: drop peer");
720 self.peers.remove(&peer_id);
721 }
722 OutEvent::PeerData(endpoint_id, data) => match decode_peer_data(&data) {
723 Err(err) => warn!("Failed to decode {data:?} from {endpoint_id}: {err}"),
724 Ok(info) => {
725 debug!(peer = ?endpoint_id, "add known addrs: {info:?}");
726 let mut endpoint_addr = EndpointAddr::new(endpoint_id);
727 for addr in info.direct_addresses {
728 endpoint_addr = endpoint_addr.with_ip_addr(addr);
729 }
730 if let Some(relay_url) = info.relay_url {
731 endpoint_addr = endpoint_addr.with_relay_url(relay_url);
732 }
733
734 self.discovery.add(endpoint_addr);
735 }
736 },
737 }
738 }
739 }
740}
741
742type ConnId = usize;
743
744#[derive(Debug)]
745enum PeerState {
746 Pending {
747 queue: Vec<ProtoMessage>,
748 },
749 Active {
750 active_send_tx: mpsc::Sender<ProtoMessage>,
751 active_conn_id: ConnId,
752 other_conns: Vec<ConnId>,
753 },
754}
755
756impl PeerState {
757 fn accept_conn(
758 &mut self,
759 send_tx: mpsc::Sender<ProtoMessage>,
760 conn_id: ConnId,
761 ) -> Vec<ProtoMessage> {
762 match self {
763 PeerState::Pending { queue } => {
764 let queue = std::mem::take(queue);
765 *self = PeerState::Active {
766 active_send_tx: send_tx,
767 active_conn_id: conn_id,
768 other_conns: Vec::new(),
769 };
770 queue
771 }
772 PeerState::Active {
773 active_send_tx,
774 active_conn_id,
775 other_conns,
776 } => {
777 other_conns.push(*active_conn_id);
783 *active_send_tx = send_tx;
784 *active_conn_id = conn_id;
785 Vec::new()
786 }
787 }
788 }
789}
790
791impl Default for PeerState {
792 fn default() -> Self {
793 PeerState::Pending { queue: Vec::new() }
794 }
795}
796
797#[derive(Debug)]
798struct TopicState {
799 neighbors: BTreeSet<EndpointId>,
800 event_sender: broadcast::Sender<ProtoEvent>,
801 command_rx_keys: HashSet<stream_group::Key>,
805}
806
807impl Default for TopicState {
808 fn default() -> Self {
809 let (event_sender, _) = broadcast::channel(TOPIC_EVENT_CAP);
810 Self {
811 neighbors: Default::default(),
812 command_rx_keys: Default::default(),
813 event_sender,
814 }
815 }
816}
817
818impl TopicState {
819 fn still_needed(&self) -> bool {
821 !self.command_rx_keys.is_empty() && self.event_sender.receiver_count() > 0
822 }
823
824 #[cfg(test)]
825 fn joined(&self) -> bool {
826 !self.neighbors.is_empty()
827 }
828}
829
830#[derive(Debug, Clone, Copy, PartialEq, Eq)]
832enum ConnOrigin {
833 Accept,
834 Dial,
835}
836
837#[allow(missing_docs)]
838#[stack_error(derive, add_meta, from_sources, std_sources)]
839#[non_exhaustive]
840enum ConnectionLoopError {
841 #[error(transparent)]
842 Write {
843 source: self::util::WriteError,
844 },
845 #[error(transparent)]
846 Read {
847 source: self::util::ReadError,
848 },
849 #[error(transparent)]
850 Connection {
851 #[error(std_err)]
852 source: iroh::endpoint::ConnectionError,
853 },
854 ActorDropped {},
855}
856
857impl<T> From<mpsc::error::SendError<T>> for ConnectionLoopError {
858 fn from(_value: mpsc::error::SendError<T>) -> Self {
859 e!(ConnectionLoopError::ActorDropped)
860 }
861}
862
863async fn connection_loop(
864 from: PublicKey,
865 conn: Connection,
866 origin: ConnOrigin,
867 send_rx: mpsc::Receiver<ProtoMessage>,
868 in_event_tx: mpsc::Sender<InEvent>,
869 max_message_size: usize,
870 queue: Vec<ProtoMessage>,
871) -> Result<(), ConnectionLoopError> {
872 debug!(?origin, "connection established");
873
874 let mut send_loop = SendLoop::new(conn.clone(), send_rx, max_message_size);
875 let mut recv_loop = RecvLoop::new(from, conn, in_event_tx, max_message_size);
876
877 let send_fut = send_loop.run(queue).instrument(error_span!("send"));
878 let recv_fut = recv_loop.run().instrument(error_span!("recv"));
879
880 let (send_res, recv_res) = tokio::join!(send_fut, recv_fut);
881 send_res?;
882 recv_res?;
883 Ok(())
884}
885
886#[derive(Default, Debug, Clone, Serialize, Deserialize)]
887struct AddrInfo {
888 relay_url: Option<RelayUrl>,
889 direct_addresses: BTreeSet<SocketAddr>,
890}
891
892impl From<EndpointAddr> for AddrInfo {
893 fn from(endpoint_addr: EndpointAddr) -> Self {
894 Self {
895 relay_url: endpoint_addr.relay_urls().next().cloned(),
896 direct_addresses: endpoint_addr.ip_addrs().cloned().collect(),
897 }
898 }
899}
900
901fn encode_peer_data(info: &AddrInfo) -> PeerData {
902 let bytes = postcard::to_stdvec(info).expect("serializing AddrInfo may not fail");
903 PeerData::new(bytes)
904}
905
906fn decode_peer_data(peer_data: &PeerData) -> Result<AddrInfo, postcard::Error> {
907 let bytes = peer_data.as_bytes();
908 if bytes.is_empty() {
909 return Ok(AddrInfo::default());
910 }
911 let info = postcard::from_bytes(bytes)?;
912 Ok(info)
913}
914
915async fn topic_subscriber_loop(
916 sender: irpc::channel::mpsc::Sender<Event>,
917 mut topic_events: broadcast::Receiver<ProtoEvent>,
918) {
919 loop {
920 tokio::select! {
921 biased;
922 msg = topic_events.recv() => {
923 let event = match msg {
924 Err(broadcast::error::RecvError::Closed) => break,
925 Err(broadcast::error::RecvError::Lagged(_)) => Event::Lagged,
926 Ok(event) => event.into(),
927 };
928 if sender.send(event).await.is_err() {
929 break;
930 }
931 }
932 _ = sender.closed() => break,
933 }
934 }
935}
936
937type BoxedCommandReceiver =
939 n0_future::stream::Boxed<Result<Command, irpc::channel::mpsc::RecvError>>;
940
941#[derive(derive_more::Debug)]
942struct TopicCommandStream {
943 topic_id: TopicId,
944 #[debug("CommandStream")]
945 stream: BoxedCommandReceiver,
946 closed: bool,
947}
948
949impl TopicCommandStream {
950 fn new(topic_id: TopicId, stream: BoxedCommandReceiver) -> Self {
951 Self {
952 topic_id,
953 stream,
954 closed: false,
955 }
956 }
957}
958
959impl Stream for TopicCommandStream {
960 type Item = (TopicId, Option<Command>);
961 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
962 if self.closed {
963 return Poll::Ready(None);
964 }
965 match Pin::new(&mut self.stream).poll_next(cx) {
966 Poll::Ready(Some(Ok(item))) => Poll::Ready(Some((self.topic_id, Some(item)))),
967 Poll::Ready(None) | Poll::Ready(Some(Err(_))) => {
968 self.closed = true;
969 Poll::Ready(Some((self.topic_id, None)))
970 }
971 Poll::Pending => Poll::Pending,
972 }
973 }
974}
975
976#[derive(Debug)]
977struct Dialer {
978 endpoint: Endpoint,
979 pending: JoinSet<(
980 EndpointId,
981 Option<Result<Connection, iroh::endpoint::ConnectError>>,
982 )>,
983 pending_dials: HashMap<EndpointId, CancellationToken>,
984}
985
986impl Dialer {
987 fn new(endpoint: Endpoint) -> Self {
989 Self {
990 endpoint,
991 pending: Default::default(),
992 pending_dials: Default::default(),
993 }
994 }
995
996 fn queue_dial(&mut self, endpoint_id: EndpointId, alpn: Bytes) {
998 if self.is_pending(endpoint_id) {
999 return;
1000 }
1001 let cancel = CancellationToken::new();
1002 self.pending_dials.insert(endpoint_id, cancel.clone());
1003 let endpoint = self.endpoint.clone();
1004 self.pending.spawn(
1005 async move {
1006 let res = tokio::select! {
1007 biased;
1008 _ = cancel.cancelled() => None,
1009 res = endpoint.connect(endpoint_id, &alpn) => Some(res),
1010 };
1011 (endpoint_id, res)
1012 }
1013 .instrument(tracing::Span::current()),
1014 );
1015 }
1016
1017 fn is_pending(&self, endpoint: EndpointId) -> bool {
1019 self.pending_dials.contains_key(&endpoint)
1020 }
1021
1022 async fn next_conn(
1025 &mut self,
1026 ) -> (
1027 EndpointId,
1028 Option<Result<Connection, iroh::endpoint::ConnectError>>,
1029 ) {
1030 match self.pending_dials.is_empty() {
1031 false => {
1032 let (endpoint_id, res) = loop {
1033 match self.pending.join_next().await {
1034 Some(Ok((endpoint_id, res))) => {
1035 self.pending_dials.remove(&endpoint_id);
1036 break (endpoint_id, res);
1037 }
1038 Some(Err(e)) => {
1039 error!("next conn error: {:?}", e);
1040 }
1041 None => {
1042 error!("no more pending conns available");
1043 std::future::pending().await
1044 }
1045 }
1046 };
1047
1048 (endpoint_id, res)
1049 }
1050 true => std::future::pending().await,
1051 }
1052 }
1053}
1054
1055#[cfg(test)]
1056pub(crate) mod test {
1057 use std::time::Duration;
1058
1059 use bytes::Bytes;
1060 use futures_concurrency::future::TryJoin;
1061 use iroh::{
1062 discovery::static_provider::StaticProvider, endpoint::BindError, protocol::Router,
1063 RelayMap, RelayMode, SecretKey,
1064 };
1065 use n0_error::{AnyError, Result, StdResultExt};
1066 use rand::{CryptoRng, Rng};
1067 use tokio::{spawn, time::timeout};
1068 use tokio_util::sync::CancellationToken;
1069 use tracing::{info, instrument};
1070 use tracing_test::traced_test;
1071
1072 use super::*;
1073 use crate::api::{ApiError, GossipReceiver, GossipSender};
1074
1075 struct ManualActorLoop {
1076 actor: Actor,
1077 step: usize,
1078 }
1079
1080 impl std::ops::Deref for ManualActorLoop {
1081 type Target = Actor;
1082
1083 fn deref(&self) -> &Self::Target {
1084 &self.actor
1085 }
1086 }
1087
1088 impl std::ops::DerefMut for ManualActorLoop {
1089 fn deref_mut(&mut self) -> &mut Self::Target {
1090 &mut self.actor
1091 }
1092 }
1093
1094 type EndpointHandle = tokio::task::JoinHandle<Result<()>>;
1095
1096 impl ManualActorLoop {
1097 #[instrument(skip_all, fields(me = %actor.endpoint.id().fmt_short()))]
1098 async fn new(mut actor: Actor) -> Self {
1099 let _ = actor.setup().await;
1100 Self { actor, step: 0 }
1101 }
1102
1103 #[instrument(skip_all, fields(me = %self.endpoint.id().fmt_short()))]
1104 async fn step(&mut self) -> bool {
1105 let ManualActorLoop { actor, step } = self;
1106 *step += 1;
1107 let addr_update_stream = &mut futures_lite::stream::pending();
1110 actor.event_loop(addr_update_stream, *step).await
1111 }
1112
1113 async fn steps(&mut self, n: usize) {
1114 for _ in 0..n {
1115 self.step().await;
1116 }
1117 }
1118
1119 async fn finish(mut self) {
1120 while self.step().await {}
1121 }
1122 }
1123
1124 impl Gossip {
1125 async fn t_new_with_actor(
1132 rng: &mut rand_chacha::ChaCha12Rng,
1133 config: proto::Config,
1134 relay_map: RelayMap,
1135 cancel: &CancellationToken,
1136 ) -> Result<(Self, Actor, EndpointHandle), BindError> {
1137 let endpoint = create_endpoint(rng, relay_map, None).await?;
1138 let metrics = Arc::new(Metrics::default());
1139 let discovery = GossipDiscovery::default();
1140 endpoint.discovery().add(discovery.clone());
1141
1142 let (actor, to_actor_tx, conn_tx) =
1143 Actor::new(endpoint, config, metrics.clone(), None, discovery);
1144 let max_message_size = actor.state.max_message_size();
1145
1146 let _actor_handle =
1147 AbortOnDropHandle::new(task::spawn(futures_lite::future::pending()));
1148 let gossip = Self {
1149 inner: Inner {
1150 api: GossipApi::local(to_actor_tx),
1151 local_tx: conn_tx,
1152 _actor_handle,
1153 max_message_size,
1154 metrics,
1155 }
1156 .into(),
1157 };
1158
1159 let endpoint_task = task::spawn(endpoint_loop(
1160 actor.endpoint.clone(),
1161 gossip.clone(),
1162 cancel.child_token(),
1163 ));
1164
1165 Ok((gossip, actor, endpoint_task))
1166 }
1167
1168 async fn t_new(
1170 rng: &mut rand_chacha::ChaCha12Rng,
1171 config: proto::Config,
1172 relay_map: RelayMap,
1173 cancel: &CancellationToken,
1174 ) -> Result<(Self, Endpoint, EndpointHandle, impl Drop + use<>), BindError> {
1175 let (g, actor, ep_handle) =
1176 Gossip::t_new_with_actor(rng, config, relay_map, cancel).await?;
1177 let ep = actor.endpoint.clone();
1178 let me = ep.id().fmt_short();
1179 let actor_handle =
1180 task::spawn(actor.run().instrument(tracing::error_span!("gossip", %me)));
1181 Ok((g, ep, ep_handle, AbortOnDropHandle::new(actor_handle)))
1182 }
1183 }
1184
1185 pub(crate) async fn create_endpoint(
1186 rng: &mut rand_chacha::ChaCha12Rng,
1187 relay_map: RelayMap,
1188 static_provider: Option<StaticProvider>,
1189 ) -> Result<Endpoint, BindError> {
1190 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1191 .secret_key(SecretKey::generate(rng))
1192 .alpns(vec![GOSSIP_ALPN.to_vec()])
1193 .insecure_skip_relay_cert_verify(true)
1194 .bind()
1195 .await?;
1196
1197 if let Some(static_provider) = static_provider {
1198 ep.discovery().add(static_provider);
1199 }
1200 ep.online().await;
1201 Ok(ep)
1202 }
1203
1204 async fn endpoint_loop(
1205 endpoint: Endpoint,
1206 gossip: Gossip,
1207 cancel: CancellationToken,
1208 ) -> Result<()> {
1209 loop {
1210 tokio::select! {
1211 biased;
1212 _ = cancel.cancelled() => break,
1213 incoming = endpoint.accept() => match incoming {
1214 None => break,
1215 Some(incoming) => {
1216 let connecting = match incoming.accept() {
1217 Ok(connecting) => connecting,
1218 Err(err) => {
1219 warn!("incoming connection failed: {err:#}");
1220 continue;
1223 }
1224 };
1225 let connection = connecting
1226 .await
1227 .std_context("await incoming connection")?;
1228 gossip.handle_connection(connection).await?
1229 }
1230 }
1231 }
1232 }
1233 Ok(())
1234 }
1235
1236 #[tokio::test]
1237 #[traced_test]
1238 async fn gossip_net_smoke() {
1239 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1240 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1241
1242 let static_provider = StaticProvider::new();
1243
1244 let ep1 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1245 .await
1246 .unwrap();
1247 let ep2 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1248 .await
1249 .unwrap();
1250 let ep3 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1251 .await
1252 .unwrap();
1253
1254 let go1 = Gossip::builder().spawn(ep1.clone());
1255 let go2 = Gossip::builder().spawn(ep2.clone());
1256 let go3 = Gossip::builder().spawn(ep3.clone());
1257 debug!("peer1 {:?}", ep1.id());
1258 debug!("peer2 {:?}", ep2.id());
1259 debug!("peer3 {:?}", ep3.id());
1260 let pi1 = ep1.id();
1261 let pi2 = ep2.id();
1262
1263 let cancel = CancellationToken::new();
1264 let tasks = [
1265 spawn(endpoint_loop(ep1.clone(), go1.clone(), cancel.clone())),
1266 spawn(endpoint_loop(ep2.clone(), go2.clone(), cancel.clone())),
1267 spawn(endpoint_loop(ep3.clone(), go3.clone(), cancel.clone())),
1268 ];
1269
1270 debug!("----- adding peers ----- ");
1271 let topic: TopicId = blake3::hash(b"foobar").into();
1272
1273 let addr1 = EndpointAddr::new(pi1).with_relay_url(relay_url.clone());
1274 let addr2 = EndpointAddr::new(pi2).with_relay_url(relay_url);
1275 static_provider.add_endpoint_info(addr1.clone());
1276 static_provider.add_endpoint_info(addr2.clone());
1277
1278 debug!("----- joining ----- ");
1279 let [sub1, mut sub2, mut sub3] = [
1281 go1.subscribe_and_join(topic, vec![]),
1282 go2.subscribe_and_join(topic, vec![pi1]),
1283 go3.subscribe_and_join(topic, vec![pi2]),
1284 ]
1285 .try_join()
1286 .await
1287 .unwrap();
1288
1289 let (sink1, _stream1) = sub1.split();
1290
1291 let len = 2;
1292
1293 let pub1 = spawn(async move {
1295 for i in 0..len {
1296 let message = format!("hi{i}");
1297 info!("go1 broadcast: {message:?}");
1298 sink1.broadcast(message.into_bytes().into()).await.unwrap();
1299 tokio::time::sleep(Duration::from_micros(1)).await;
1300 }
1301 });
1302
1303 let sub2 = spawn(async move {
1305 let mut recv = vec![];
1306 loop {
1307 let ev = sub2.next().await.unwrap().unwrap();
1308 info!("go2 event: {ev:?}");
1309 if let Event::Received(msg) = ev {
1310 recv.push(msg.content);
1311 }
1312 if recv.len() == len {
1313 return recv;
1314 }
1315 }
1316 });
1317
1318 let sub3 = spawn(async move {
1320 let mut recv = vec![];
1321 loop {
1322 let ev = sub3.next().await.unwrap().unwrap();
1323 info!("go3 event: {ev:?}");
1324 if let Event::Received(msg) = ev {
1325 recv.push(msg.content);
1326 }
1327 if recv.len() == len {
1328 return recv;
1329 }
1330 }
1331 });
1332
1333 timeout(Duration::from_secs(10), pub1)
1334 .await
1335 .unwrap()
1336 .unwrap();
1337 let recv2 = timeout(Duration::from_secs(10), sub2)
1338 .await
1339 .unwrap()
1340 .unwrap();
1341 let recv3 = timeout(Duration::from_secs(10), sub3)
1342 .await
1343 .unwrap()
1344 .unwrap();
1345
1346 let expected: Vec<Bytes> = (0..len)
1347 .map(|i| Bytes::from(format!("hi{i}").into_bytes()))
1348 .collect();
1349 assert_eq!(recv2, expected);
1350 assert_eq!(recv3, expected);
1351
1352 cancel.cancel();
1353 for t in tasks {
1354 timeout(Duration::from_secs(10), t)
1355 .await
1356 .unwrap()
1357 .unwrap()
1358 .unwrap();
1359 }
1360 }
1361
1362 #[tokio::test]
1372 #[traced_test]
1373 async fn subscription_cleanup() -> Result {
1374 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1375 let ct = CancellationToken::new();
1376 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1377
1378 let (go1, actor, ep1_handle) =
1380 Gossip::t_new_with_actor(rng, Default::default(), relay_map.clone(), &ct).await?;
1381 let mut actor = ManualActorLoop::new(actor).await;
1382
1383 let (go2, ep2, ep2_handle, _test_actor_handle) =
1385 Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1386
1387 let endpoint_id1 = actor.endpoint.id();
1388 let endpoint_id2 = ep2.id();
1389 tracing::info!(
1390 endpoint_1 = %endpoint_id1.fmt_short(),
1391 endpoint_2 = %endpoint_id2.fmt_short(),
1392 "endpoints ready"
1393 );
1394
1395 let topic: TopicId = blake3::hash(b"subscription_cleanup").into();
1396 tracing::info!(%topic, "joining");
1397
1398 let ct2 = ct.clone();
1405 let go2_task = async move {
1406 let (_pub_tx, mut sub_rx) = go2.subscribe_and_join(topic, vec![]).await?.split();
1407
1408 let subscribe_fut = async {
1409 while let Some(ev) = sub_rx.try_next().await? {
1410 match ev {
1411 Event::Lagged => tracing::debug!("missed some messages :("),
1412 Event::Received(_) => unreachable!("test does not send messages"),
1413 other => tracing::debug!(?other, "gs event"),
1414 }
1415 }
1416
1417 tracing::debug!("subscribe stream ended");
1418 Ok::<_, AnyError>(())
1419 };
1420
1421 tokio::select! {
1422 _ = ct2.cancelled() => Ok(()),
1423 res = subscribe_fut => res,
1424 }
1425 }
1426 .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1427 let go2_handle = task::spawn(go2_task);
1428
1429 let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1431 let static_provider = StaticProvider::new();
1432 static_provider.add_endpoint_info(addr2);
1433 actor.endpoint.discovery().add(static_provider);
1434 let (tx, mut rx) = mpsc::channel::<()>(1);
1436 let ct1 = ct.clone();
1437 let go1_task = async move {
1438 tracing::info!("subscribing the first time");
1440 let sub_1a = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1441
1442 rx.recv().await.expect("signal for second subscribe");
1444 tracing::info!("subscribing a second time");
1445 let sub_1b = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1446 drop(sub_1a);
1447
1448 rx.recv().await.expect("signal for second subscribe");
1450 tracing::info!("dropping all handles");
1451 drop(sub_1b);
1452
1453 ct1.cancelled().await;
1455 drop(go1);
1456
1457 Ok::<_, AnyError>(())
1458 }
1459 .instrument(tracing::debug_span!("endpoint_1", %endpoint_id1));
1460 let go1_handle = task::spawn(go1_task);
1461
1462 actor.steps(3).await; let state = actor.topics.get(&topic).expect("get registered topic");
1467 assert!(state.joined());
1468
1469 tx.send(())
1471 .await
1472 .std_context("signal additional subscribe")?;
1473 actor.steps(3).await; let state = actor.topics.get(&topic).expect("get registered topic");
1475 assert!(state.joined());
1476
1477 tx.send(()).await.std_context("signal drop handles")?;
1479 actor.steps(2).await; assert!(!actor.topics.contains_key(&topic));
1481
1482 ct.cancel();
1484 let wait = Duration::from_secs(2);
1485 timeout(wait, ep1_handle)
1486 .await
1487 .std_context("wait endpoint1 task")?
1488 .std_context("join endpoint1 task")??;
1489 timeout(wait, ep2_handle)
1490 .await
1491 .std_context("wait endpoint2 task")?
1492 .std_context("join endpoint2 task")??;
1493 timeout(wait, go1_handle)
1494 .await
1495 .std_context("wait gossip1 task")?
1496 .std_context("join gossip1 task")??;
1497 timeout(wait, go2_handle)
1498 .await
1499 .std_context("wait gossip2 task")?
1500 .std_context("join gossip2 task")??;
1501 timeout(wait, actor.finish())
1502 .await
1503 .std_context("wait actor finish")?;
1504
1505 Ok(())
1506 }
1507
1508 #[tokio::test]
1515 #[traced_test]
1516 async fn can_reconnect() -> Result {
1517 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1518 let ct = CancellationToken::new();
1519 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1520
1521 let (go1, ep1, ep1_handle, _test_actor_handle1) =
1522 Gossip::t_new(rng, Default::default(), relay_map.clone(), &ct).await?;
1523
1524 let (go2, ep2, ep2_handle, _test_actor_handle2) =
1525 Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1526
1527 let endpoint_id1 = ep1.id();
1528 let endpoint_id2 = ep2.id();
1529 tracing::info!(
1530 endpoint_1 = %endpoint_id1.fmt_short(),
1531 endpoint_2 = %endpoint_id2.fmt_short(),
1532 "endpoints ready"
1533 );
1534
1535 let topic: TopicId = blake3::hash(b"can_reconnect").into();
1536 tracing::info!(%topic, "joining");
1537
1538 let ct2 = ct.child_token();
1539 let (tx, mut rx) = mpsc::channel::<()>(1);
1541 let addr1 = EndpointAddr::new(endpoint_id1).with_relay_url(relay_url.clone());
1542 let static_provider = StaticProvider::new();
1543 static_provider.add_endpoint_info(addr1);
1544 ep2.discovery().add(static_provider.clone());
1545 let go2_task = async move {
1546 let mut sub = go2.subscribe(topic, Vec::new()).await?;
1547 sub.joined().await?;
1548
1549 rx.recv().await.expect("signal to unsubscribe");
1550 tracing::info!("unsubscribing");
1551 drop(sub);
1552
1553 rx.recv().await.expect("signal to subscribe again");
1554 tracing::info!("resubscribing");
1555 let mut sub = go2.subscribe(topic, vec![endpoint_id1]).await?;
1556
1557 sub.joined().await?;
1558 tracing::info!("subscription successful!");
1559
1560 ct2.cancelled().await;
1561
1562 Ok::<_, ApiError>(())
1563 }
1564 .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1565 let go2_handle = task::spawn(go2_task);
1566
1567 let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1568 static_provider.add_endpoint_info(addr2);
1569 ep1.discovery().add(static_provider);
1570
1571 let mut sub = go1.subscribe(topic, vec![endpoint_id2]).await?;
1572 sub.joined().await?;
1574
1575 tx.send(()).await.std_context("signal unsubscribe")?;
1577
1578 let conn_timeout = Duration::from_millis(500);
1580 let ev = timeout(conn_timeout, sub.try_next())
1581 .await
1582 .std_context("wait neighbor down")??;
1583 assert_eq!(ev, Some(Event::NeighborDown(endpoint_id2)));
1584 tracing::info!("endpoint 2 left");
1585
1586 tx.send(()).await.std_context("signal resubscribe")?;
1588
1589 let conn_timeout = Duration::from_millis(500);
1590 let ev = timeout(conn_timeout, sub.try_next())
1591 .await
1592 .std_context("wait neighbor up")??;
1593 assert_eq!(ev, Some(Event::NeighborUp(endpoint_id2)));
1594 tracing::info!("endpoint 2 rejoined!");
1595
1596 ct.cancel();
1598 let wait = Duration::from_secs(2);
1599 timeout(wait, ep1_handle)
1600 .await
1601 .std_context("wait endpoint1 task")?
1602 .std_context("join endpoint1 task")??;
1603 timeout(wait, ep2_handle)
1604 .await
1605 .std_context("wait endpoint2 task")?
1606 .std_context("join endpoint2 task")??;
1607 timeout(wait, go2_handle)
1608 .await
1609 .std_context("wait gossip2 task")?
1610 .std_context("join gossip2 task")??;
1611
1612 Result::Ok(())
1613 }
1614
1615 #[tokio::test]
1616 #[traced_test]
1617 async fn can_die_and_reconnect() -> Result {
1618 fn run_in_thread<T: Send + 'static>(
1621 cancel: CancellationToken,
1622 fut: impl std::future::Future<Output = T> + Send + 'static,
1623 ) -> std::thread::JoinHandle<Option<T>> {
1624 std::thread::spawn(move || {
1625 let rt = tokio::runtime::Builder::new_current_thread()
1626 .enable_all()
1627 .build()
1628 .unwrap();
1629 rt.block_on(async move { cancel.run_until_cancelled(fut).await })
1630 })
1631 }
1632
1633 async fn spawn_gossip(
1635 secret_key: SecretKey,
1636 relay_map: RelayMap,
1637 ) -> Result<(Router, Gossip), BindError> {
1638 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1639 .secret_key(secret_key)
1640 .insecure_skip_relay_cert_verify(true)
1641 .bind()
1642 .await?;
1643 let gossip = Gossip::builder().spawn(ep.clone());
1644 let router = Router::builder(ep)
1645 .accept(GOSSIP_ALPN, gossip.clone())
1646 .spawn();
1647 Ok((router, gossip))
1648 }
1649
1650 async fn broadcast_once(
1652 secret_key: SecretKey,
1653 relay_map: RelayMap,
1654 bootstrap_addr: EndpointAddr,
1655 topic_id: TopicId,
1656 message: String,
1657 ) -> Result {
1658 let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1659 info!(endpoint_id = %router.endpoint().id().fmt_short(), "broadcast endpoint spawned");
1660 let bootstrap = vec![bootstrap_addr.id];
1661 let static_provider = StaticProvider::new();
1662 static_provider.add_endpoint_info(bootstrap_addr);
1663 router.endpoint().discovery().add(static_provider);
1664 let mut topic = gossip.subscribe_and_join(topic_id, bootstrap).await?;
1665 topic.broadcast(message.as_bytes().to_vec().into()).await?;
1666 std::future::pending::<()>().await;
1667 Ok(())
1668 }
1669
1670 let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1671 let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1672 let topic_id = TopicId::from_bytes(rng.random());
1673
1674 let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
1677 let (msgs_recv_tx, mut msgs_recv_rx) = tokio::sync::mpsc::channel(3);
1678 let recv_task = tokio::task::spawn({
1679 let relay_map = relay_map.clone();
1680 let secret_key = SecretKey::generate(&mut rng);
1681 async move {
1682 let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1683 router.endpoint().online().await;
1688 let addr = router.endpoint().addr();
1689 info!(endpoint_id = %addr.id.fmt_short(), "recv endpoint spawned");
1690 addr_tx.send(addr).unwrap();
1691 let mut topic = gossip.subscribe_and_join(topic_id, vec![]).await?;
1692 while let Some(event) = topic.try_next().await.unwrap() {
1693 if let Event::Received(message) = event {
1694 let message = std::str::from_utf8(&message.content)
1695 .std_context("decode broadcast message")?
1696 .to_string();
1697 msgs_recv_tx
1698 .send(message)
1699 .await
1700 .std_context("forward received message")?;
1701 }
1702 }
1703 Ok::<_, AnyError>(())
1704 }
1705 });
1706
1707 let endpoint0_addr = addr_rx.await.std_context("receive endpoint address")?;
1708 let max_wait = Duration::from_secs(5);
1709
1710 let cancel = CancellationToken::new();
1713 let secret = SecretKey::generate(&mut rng);
1714 let join_handle_1 = run_in_thread(
1715 cancel.clone(),
1716 broadcast_once(
1717 secret.clone(),
1718 relay_map.clone(),
1719 endpoint0_addr.clone(),
1720 topic_id,
1721 "msg1".to_string(),
1722 ),
1723 );
1724 let msg = timeout(max_wait, msgs_recv_rx.recv())
1726 .await
1727 .std_context("wait for first broadcast")?
1728 .std_context("receiver dropped channel")?;
1729 assert_eq!(&msg, "msg1");
1730 info!("kill broadcast endpoint");
1731 cancel.cancel();
1732
1733 let cancel = CancellationToken::new();
1735 let join_handle_2 = run_in_thread(
1736 cancel.clone(),
1737 broadcast_once(
1738 secret.clone(),
1739 relay_map.clone(),
1740 endpoint0_addr.clone(),
1741 topic_id,
1742 "msg2".to_string(),
1743 ),
1744 );
1745 let msg = timeout(max_wait, msgs_recv_rx.recv())
1748 .await
1749 .std_context("wait for second broadcast")?
1750 .std_context("receiver dropped channel")?;
1751 assert_eq!(&msg, "msg2");
1752 info!("kill broadcast endpoint");
1753 cancel.cancel();
1754
1755 info!("kill recv endpoint");
1756 recv_task.abort();
1757 assert!(join_handle_1.join().unwrap().is_none());
1758 assert!(join_handle_2.join().unwrap().is_none());
1759
1760 Ok(())
1761 }
1762
1763 #[tokio::test]
1764 #[traced_test]
1765 async fn gossip_change_alpn() -> n0_error::Result<()> {
1766 let alpn = b"my-gossip-alpn";
1767 let topic_id = TopicId::from([0u8; 32]);
1768
1769 let ep1 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1770 let ep2 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1771 let gossip1 = Gossip::builder().alpn(alpn).spawn(ep1.clone());
1772 let gossip2 = Gossip::builder().alpn(alpn).spawn(ep2.clone());
1773 let router1 = Router::builder(ep1).accept(alpn, gossip1.clone()).spawn();
1774 let router2 = Router::builder(ep2).accept(alpn, gossip2.clone()).spawn();
1775
1776 let addr1 = router1.endpoint().addr();
1777 let id1 = addr1.id;
1778 let static_provider = StaticProvider::new();
1779 static_provider.add_endpoint_info(addr1);
1780 router2.endpoint().discovery().add(static_provider);
1781
1782 let mut topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1783 let mut topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1784
1785 timeout(Duration::from_secs(3), topic1.joined())
1786 .await
1787 .std_context("wait topic1 join")??;
1788 timeout(Duration::from_secs(3), topic2.joined())
1789 .await
1790 .std_context("wait topic2 join")??;
1791 router1.shutdown().await.std_context("shutdown router1")?;
1792 router2.shutdown().await.std_context("shutdown router2")?;
1793 Ok(())
1794 }
1795
1796 #[tokio::test]
1797 #[traced_test]
1798 async fn gossip_rely_on_gossip_discovery() -> n0_error::Result<()> {
1799 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1800
1801 async fn spawn(
1802 rng: &mut impl CryptoRng,
1803 ) -> n0_error::Result<(EndpointId, Router, Gossip, GossipSender, GossipReceiver)> {
1804 let topic_id = TopicId::from([0u8; 32]);
1805 let ep = Endpoint::empty_builder(RelayMode::Disabled)
1806 .secret_key(SecretKey::generate(rng))
1807 .bind()
1808 .await?;
1809 let endpoint_id = ep.id();
1810 let gossip = Gossip::builder().spawn(ep.clone());
1811 let router = Router::builder(ep)
1812 .accept(GOSSIP_ALPN, gossip.clone())
1813 .spawn();
1814 let topic = gossip.subscribe(topic_id, vec![]).await?;
1815 let (sender, receiver) = topic.split();
1816 Ok((endpoint_id, router, gossip, sender, receiver))
1817 }
1818
1819 let (n1, r1, _g1, _tx1, mut rx1) = spawn(rng).await?;
1821 let (n2, r2, _g2, tx2, mut rx2) = spawn(rng).await?;
1822 let (n3, r3, _g3, tx3, mut rx3) = spawn(rng).await?;
1823
1824 println!("endpoints {:?}", [n1, n2, n3]);
1825
1826 let addr1 = r1.endpoint().addr();
1828 let disco = StaticProvider::new();
1829 disco.add_endpoint_info(addr1);
1830
1831 r2.endpoint().discovery().add(disco.clone());
1833 tx2.join_peers(vec![n1]).await?;
1834
1835 timeout(Duration::from_secs(3), rx1.joined())
1837 .await
1838 .std_context("wait rx1 join")??;
1839 timeout(Duration::from_secs(3), rx2.joined())
1840 .await
1841 .std_context("wait rx2 join")??;
1842
1843 r3.endpoint().discovery().add(disco.clone());
1845 tx3.join_peers(vec![n1]).await?;
1846
1847 let ev = timeout(Duration::from_secs(3), rx3.next())
1850 .await
1851 .std_context("wait rx3 first neighbor")?;
1852 assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1853 let ev = timeout(Duration::from_secs(3), rx3.next())
1854 .await
1855 .std_context("wait rx3 second neighbor")?;
1856 assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1857
1858 assert_eq!(sorted(rx3.neighbors()), sorted([n1, n2]));
1859
1860 let ev = timeout(Duration::from_secs(3), rx2.next())
1861 .await
1862 .std_context("wait rx2 neighbor")?;
1863 assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1864
1865 let ev = timeout(Duration::from_secs(3), rx1.next())
1866 .await
1867 .std_context("wait rx1 neighbor")?;
1868 assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1869
1870 tokio::try_join!(r1.shutdown(), r2.shutdown(), r3.shutdown())
1871 .std_context("shutdown routers")?;
1872 Ok(())
1873 }
1874
1875 fn sorted<T: Ord>(input: impl IntoIterator<Item = T>) -> Vec<T> {
1876 let mut out: Vec<_> = input.into_iter().collect();
1877 out.sort();
1878 out
1879 }
1880}