1use std::{
4 collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
5 net::SocketAddr,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_concurrency::stream::{stream_group, StreamGroup};
13use futures_util::FutureExt as _;
14use iroh::{
15 endpoint::Connection,
16 protocol::{AcceptError, ProtocolHandler},
17 Endpoint, EndpointAddr, EndpointId, PublicKey, RelayUrl, Watcher,
18};
19use irpc::WithChannels;
20use n0_error::{e, stack_error};
21use n0_future::{
22 task::{self, AbortOnDropHandle, JoinSet},
23 time::Instant,
24 Stream, StreamExt as _,
25};
26use rand::{rngs::StdRng, SeedableRng};
27use serde::{Deserialize, Serialize};
28use tokio::sync::{broadcast, mpsc, oneshot};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, error_span, trace, warn, Instrument};
31
32use self::{
33 address_lookup::GossipAddressLookup,
34 util::{RecvLoop, SendLoop, Timers},
35};
36use crate::{
37 api::{self, Command, Event, GossipApi, RpcMessage},
38 metrics::Metrics,
39 proto::{self, HyparviewConfig, PeerData, PlumtreeConfig, Scope, TopicId},
40};
41
42mod address_lookup;
43mod util;
44
45pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/1";
47
48const SEND_QUEUE_CAP: usize = 64;
50const TO_ACTOR_CAP: usize = 64;
52const IN_EVENT_CAP: usize = 1024;
54const TOPIC_EVENT_CAP: usize = 256;
56
57pub type ProtoEvent = proto::Event<PublicKey>;
59pub type ProtoCommand = proto::Command<PublicKey>;
61
62type InEvent = proto::InEvent<PublicKey>;
63type OutEvent = proto::OutEvent<PublicKey>;
64type Timer = proto::Timer<PublicKey>;
65type ProtoMessage = proto::Message<PublicKey>;
66
67#[derive(Debug, Clone)]
86pub struct Gossip {
87 pub(crate) inner: Arc<Inner>,
88}
89
90impl std::ops::Deref for Gossip {
91 type Target = GossipApi;
92 fn deref(&self) -> &Self::Target {
93 &self.inner.api
94 }
95}
96
97#[derive(Debug)]
98enum LocalActorMessage {
99 HandleConnection(Connection),
100 Shutdown { reply: oneshot::Sender<()> },
101}
102
103#[allow(missing_docs)]
104#[stack_error(derive, add_meta)]
105#[non_exhaustive]
106pub enum Error {
107 ActorDropped {},
108}
109
110impl<T> From<mpsc::error::SendError<T>> for Error {
111 fn from(_value: mpsc::error::SendError<T>) -> Self {
112 e!(Error::ActorDropped)
113 }
114}
115impl From<oneshot::error::RecvError> for Error {
116 fn from(_value: oneshot::error::RecvError) -> Self {
117 e!(Error::ActorDropped)
118 }
119}
120
121#[derive(Debug)]
122pub(crate) struct Inner {
123 api: GossipApi,
124 local_tx: mpsc::Sender<LocalActorMessage>,
125 _actor_handle: AbortOnDropHandle<()>,
126 max_message_size: usize,
127 metrics: Arc<Metrics>,
128}
129
130impl ProtocolHandler for Gossip {
131 async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
132 self.handle_connection(connection)
133 .await
134 .map_err(AcceptError::from_err)?;
135 Ok(())
136 }
137
138 async fn shutdown(&self) {
139 if let Err(err) = self.shutdown().await {
140 warn!("error while shutting down gossip: {err:#}");
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
147pub struct Builder {
148 config: proto::Config,
149 alpn: Option<Bytes>,
150}
151
152impl Builder {
153 pub fn max_message_size(mut self, size: usize) -> Self {
156 self.config.max_message_size = size;
157 self
158 }
159
160 pub fn membership_config(mut self, config: HyparviewConfig) -> Self {
162 self.config.membership = config;
163 self
164 }
165
166 pub fn broadcast_config(mut self, config: PlumtreeConfig) -> Self {
168 self.config.broadcast = config;
169 self
170 }
171
172 pub fn alpn(mut self, alpn: impl AsRef<[u8]>) -> Self {
180 self.alpn = Some(alpn.as_ref().to_vec().into());
181 self
182 }
183
184 pub fn spawn(self, endpoint: Endpoint) -> Gossip {
186 let metrics = Arc::new(Metrics::default());
187 let address_lookup = GossipAddressLookup::default();
188 endpoint.address_lookup().add(address_lookup.clone());
189 let (actor, rpc_tx, local_tx) = Actor::new(
190 endpoint,
191 self.config,
192 metrics.clone(),
193 self.alpn,
194 address_lookup,
195 );
196 let me = actor.endpoint.id().fmt_short();
197 let max_message_size = actor.state.max_message_size();
198
199 let actor_handle = task::spawn(actor.run().instrument(error_span!("gossip", %me)));
200
201 let api = GossipApi::local(rpc_tx);
202
203 Gossip {
204 inner: Inner {
205 api,
206 local_tx,
207 _actor_handle: AbortOnDropHandle::new(actor_handle),
208 max_message_size,
209 metrics,
210 }
211 .into(),
212 }
213 }
214}
215
216impl Gossip {
217 pub fn builder() -> Builder {
219 Builder {
220 config: Default::default(),
221 alpn: None,
222 }
223 }
224
225 #[cfg(feature = "rpc")]
227 pub async fn listen(self, endpoint: quinn::Endpoint) {
228 self.inner.api.listen(endpoint).await
229 }
230
231 pub fn max_message_size(&self) -> usize {
233 self.inner.max_message_size
234 }
235
236 pub async fn handle_connection(&self, conn: Connection) -> Result<(), Error> {
240 self.inner
241 .local_tx
242 .send(LocalActorMessage::HandleConnection(conn))
243 .await?;
244 Ok(())
245 }
246
247 pub async fn shutdown(&self) -> Result<(), Error> {
252 let (reply, reply_rx) = oneshot::channel();
253 self.inner
254 .local_tx
255 .send(LocalActorMessage::Shutdown { reply })
256 .await?;
257 reply_rx.await?;
258 Ok(())
259 }
260
261 pub fn metrics(&self) -> &Arc<Metrics> {
263 &self.inner.metrics
264 }
265}
266
267struct Actor {
269 alpn: Bytes,
270 state: proto::State<PublicKey, StdRng>,
272 endpoint: Endpoint,
274 dialer: Dialer,
276 rpc_rx: mpsc::Receiver<RpcMessage>,
278 local_rx: mpsc::Receiver<LocalActorMessage>,
279 in_event_tx: mpsc::Sender<InEvent>,
281 in_event_rx: mpsc::Receiver<InEvent>,
283 timers: Timers<Timer>,
285 topics: HashMap<TopicId, TopicState>,
287 peers: HashMap<EndpointId, PeerState>,
289 command_rx: stream_group::Keyed<TopicCommandStream>,
291 quit_queue: VecDeque<TopicId>,
293 connection_tasks: JoinSet<(EndpointId, Connection, Result<(), ConnectionLoopError>)>,
295 metrics: Arc<Metrics>,
296 topic_event_forwarders: JoinSet<TopicId>,
297 address_lookup: GossipAddressLookup,
298}
299
300impl Actor {
301 fn new(
302 endpoint: Endpoint,
303 config: proto::Config,
304 metrics: Arc<Metrics>,
305 alpn: Option<Bytes>,
306 address_lookup: GossipAddressLookup,
307 ) -> (
308 Self,
309 mpsc::Sender<RpcMessage>,
310 mpsc::Sender<LocalActorMessage>,
311 ) {
312 let peer_id = endpoint.id();
313 let dialer = Dialer::new(endpoint.clone());
314 let state = proto::State::new(
315 peer_id,
316 Default::default(),
317 config,
318 rand::rngs::StdRng::from_rng(&mut rand::rng()),
319 );
320 let (rpc_tx, rpc_rx) = mpsc::channel(TO_ACTOR_CAP);
321 let (local_tx, local_rx) = mpsc::channel(16);
322 let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
323
324 let actor = Actor {
325 alpn: alpn.unwrap_or_else(|| GOSSIP_ALPN.to_vec().into()),
326 endpoint,
327 state,
328 dialer,
329 rpc_rx,
330 in_event_rx,
331 in_event_tx,
332 timers: Timers::new(),
333 command_rx: StreamGroup::new().keyed(),
334 peers: Default::default(),
335 topics: Default::default(),
336 quit_queue: Default::default(),
337 connection_tasks: Default::default(),
338 metrics,
339 local_rx,
340 topic_event_forwarders: Default::default(),
341 address_lookup,
342 };
343
344 (actor, rpc_tx, local_tx)
345 }
346
347 pub async fn run(mut self) {
348 let mut addr_update_stream = self.setup().await;
349
350 let mut i = 0;
351 while self.event_loop(&mut addr_update_stream, i).await {
352 i += 1;
353 }
354 }
355
356 async fn setup(&mut self) -> impl Stream<Item = EndpointAddr> + Send + Unpin + use<> {
361 let addr_update_stream = self.endpoint.watch_addr().stream();
362 let initial_addr = self.endpoint.addr();
363 self.handle_addr_update(initial_addr).await;
364 addr_update_stream
365 }
366
367 async fn event_loop(
371 &mut self,
372 addr_updates: &mut (impl Stream<Item = EndpointAddr> + Send + Unpin),
373 i: usize,
374 ) -> bool {
375 self.metrics.actor_tick_main.inc();
376 tokio::select! {
377 biased;
378 conn = self.local_rx.recv() => {
379 match conn {
380 Some(LocalActorMessage::Shutdown { reply }) => {
381 debug!("received shutdown message, quit all topics");
382 self.quit_queue.extend(self.topics.keys().copied());
383 self.process_quit_queue().await;
384 debug!("all topics quit, stop gossip actor");
385 reply.send(()).ok();
386 return false;
387 },
388 Some(LocalActorMessage::HandleConnection(conn)) => {
389 self.handle_connection(conn.remote_id(), ConnOrigin::Accept, conn);
390 }
391 None => {
392 debug!("all gossip handles dropped, stop gossip actor");
393 return false;
394 }
395 }
396 }
397 msg = self.rpc_rx.recv() => {
398 trace!(?i, "tick: to_actor_rx");
399 self.metrics.actor_tick_rx.inc();
400 match msg {
401 Some(msg) => {
402 self.handle_rpc_msg(msg, Instant::now()).await;
403 }
404 None => {
405 debug!("all gossip handles dropped, stop gossip actor");
406 return false;
407 }
408 }
409 },
410 Some((key, (topic, command))) = self.command_rx.next(), if !self.command_rx.is_empty() => {
411 trace!(?i, "tick: command_rx");
412 self.handle_command(topic, key, command).await;
413 },
414 Some(new_address) = addr_updates.next() => {
415 trace!(?i, "tick: new_address");
416 self.metrics.actor_tick_endpoint.inc();
417 self.handle_addr_update(new_address).await;
418 }
419 (peer_id, res) = self.dialer.next_conn() => {
420 trace!(?i, "tick: dialer");
421 self.metrics.actor_tick_dialer.inc();
422 match res {
423 Some(Ok(conn)) => {
424 debug!(peer = %peer_id.fmt_short(), "dial successful");
425 self.metrics.actor_tick_dialer_success.inc();
426 self.handle_connection(peer_id, ConnOrigin::Dial, conn);
427 }
428 Some(Err(err)) => {
429 warn!(peer = %peer_id.fmt_short(), "dial failed: {err}");
430 self.metrics.actor_tick_dialer_failure.inc();
431 let peer_state = self.peers.get(&peer_id);
432 let is_active = matches!(peer_state, Some(PeerState::Active { .. }));
433 if !is_active {
434 self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
435 .await;
436 }
437 }
438 None => {
439 warn!(peer = %peer_id.fmt_short(), "dial disconnected");
440 self.metrics.actor_tick_dialer_failure.inc();
441 }
442 }
443 }
444 event = self.in_event_rx.recv() => {
445 trace!(?i, "tick: in_event_rx");
446 self.metrics.actor_tick_in_event_rx.inc();
447 let event = event.expect("unreachable: in_event_tx is never dropped before receiver");
448 self.handle_in_event(event, Instant::now()).await;
449 }
450 _ = self.timers.wait_next() => {
451 trace!(?i, "tick: timers");
452 self.metrics.actor_tick_timers.inc();
453 let now = Instant::now();
454 while let Some((_instant, timer)) = self.timers.pop_before(now) {
455 self.handle_in_event(InEvent::TimerExpired(timer), now).await;
456 }
457 }
458 Some(res) = self.connection_tasks.join_next(), if !self.connection_tasks.is_empty() => {
459 trace!(?i, "tick: connection_tasks");
460 let (peer_id, conn, result) = res.expect("connection task panicked");
461 self.handle_connection_task_finished(peer_id, conn, result).await;
462 }
463 Some(res) = self.topic_event_forwarders.join_next(), if !self.topic_event_forwarders.is_empty() => {
464 let topic_id = res.expect("topic event forwarder panicked");
465 if let Some(state) = self.topics.get_mut(&topic_id) {
466 if !state.still_needed() {
467 self.quit_queue.push_back(topic_id);
468 self.process_quit_queue().await;
469 }
470 }
471 }
472 }
473
474 true
475 }
476
477 async fn handle_addr_update(&mut self, endpoint_addr: EndpointAddr) {
478 let peer_data = encode_peer_data(&endpoint_addr.into());
480 self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now())
481 .await
482 }
483
484 async fn handle_command(
485 &mut self,
486 topic: TopicId,
487 key: stream_group::Key,
488 command: Option<Command>,
489 ) {
490 debug!(?topic, ?key, ?command, "handle command");
491 let Some(state) = self.topics.get_mut(&topic) else {
492 warn!("received command for unknown topic");
494 return;
495 };
496 match command {
497 Some(command) => {
498 let command = match command {
499 Command::Broadcast(message) => ProtoCommand::Broadcast(message, Scope::Swarm),
500 Command::BroadcastNeighbors(message) => {
501 ProtoCommand::Broadcast(message, Scope::Neighbors)
502 }
503 Command::JoinPeers(peers) => ProtoCommand::Join(peers),
504 };
505 self.handle_in_event(proto::InEvent::Command(topic, command), Instant::now())
506 .await;
507 }
508 None => {
509 state.command_rx_keys.remove(&key);
510 if !state.still_needed() {
511 self.quit_queue.push_back(topic);
512 self.process_quit_queue().await;
513 }
514 }
515 }
516 }
517
518 fn handle_connection(&mut self, peer_id: EndpointId, origin: ConnOrigin, conn: Connection) {
519 let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
520 let conn_id = conn.stable_id();
521
522 let queue = match self.peers.entry(peer_id) {
523 Entry::Occupied(mut entry) => entry.get_mut().accept_conn(send_tx, conn_id),
524 Entry::Vacant(entry) => {
525 entry.insert(PeerState::Active {
526 active_send_tx: send_tx,
527 active_conn_id: conn_id,
528 other_conns: Vec::new(),
529 });
530 Vec::new()
531 }
532 };
533
534 let max_message_size = self.state.max_message_size();
535 let in_event_tx = self.in_event_tx.clone();
536
537 self.connection_tasks.spawn(
539 async move {
540 let res = connection_loop(
541 peer_id,
542 conn.clone(),
543 origin,
544 send_rx,
545 in_event_tx,
546 max_message_size,
547 queue,
548 )
549 .await;
550 (peer_id, conn, res)
551 }
552 .instrument(error_span!("conn", peer = %peer_id.fmt_short())),
553 );
554 }
555
556 #[tracing::instrument(name = "conn", skip_all, fields(peer = %peer_id.fmt_short()))]
557 async fn handle_connection_task_finished(
558 &mut self,
559 peer_id: EndpointId,
560 conn: Connection,
561 task_result: Result<(), ConnectionLoopError>,
562 ) {
563 if conn.close_reason().is_none() {
564 conn.close(0u32.into(), b"close from disconnect");
565 }
566 let reason = conn.close_reason().expect("just closed");
567 let error = task_result.err();
568 debug!(%reason, ?error, "connection closed");
569 if let Some(PeerState::Active {
570 active_conn_id,
571 other_conns,
572 ..
573 }) = self.peers.get_mut(&peer_id)
574 {
575 if conn.stable_id() == *active_conn_id {
576 debug!("active send connection closed, mark peer as disconnected");
577 self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
578 .await;
579 } else {
580 other_conns.retain(|x| *x != conn.stable_id());
581 debug!("remaining {} other connections", other_conns.len() + 1);
582 }
583 } else {
584 debug!("peer already marked as disconnected");
585 }
586 }
587
588 async fn handle_rpc_msg(&mut self, msg: RpcMessage, now: Instant) {
589 trace!("handle to_actor {msg:?}");
590 match msg {
591 RpcMessage::Join(msg) => {
592 let WithChannels {
593 inner,
594 rx,
595 tx,
596 span: _,
598 } = msg;
599 let api::JoinRequest {
600 topic_id,
601 bootstrap,
602 } = inner;
603 let TopicState {
604 neighbors,
605 event_sender,
606 command_rx_keys,
607 } = self.topics.entry(topic_id).or_default();
608 let mut sender_dead = false;
609 if !neighbors.is_empty() {
610 for neighbor in neighbors.iter() {
611 if let Err(_err) = tx.try_send(Event::NeighborUp(*neighbor)).await {
612 sender_dead = true;
613 break;
614 }
615 }
616 }
617
618 if !sender_dead {
619 let fut =
620 topic_subscriber_loop(tx, event_sender.subscribe()).map(move |_| topic_id);
621 self.topic_event_forwarders
622 .spawn(fut.instrument(tracing::Span::current()));
623 }
624 let command_rx = TopicCommandStream::new(topic_id, Box::pin(rx.into_stream()));
625 let key = self.command_rx.insert(command_rx);
626 command_rx_keys.insert(key);
627
628 self.handle_in_event(
629 InEvent::Command(
630 topic_id,
631 ProtoCommand::Join(bootstrap.into_iter().collect()),
632 ),
633 now,
634 )
635 .await;
636 }
637 }
638 }
639
640 async fn handle_in_event(&mut self, event: InEvent, now: Instant) {
641 self.handle_in_event_inner(event, now).await;
642 self.process_quit_queue().await;
643 }
644
645 async fn process_quit_queue(&mut self) {
646 while let Some(topic_id) = self.quit_queue.pop_front() {
647 self.handle_in_event_inner(
648 InEvent::Command(topic_id, ProtoCommand::Quit),
649 Instant::now(),
650 )
651 .await;
652 if self.topics.remove(&topic_id).is_some() {
653 tracing::debug!(%topic_id, "publishers and subscribers gone; unsubscribing");
654 }
655 }
656 }
657
658 async fn handle_in_event_inner(&mut self, event: InEvent, now: Instant) {
659 if matches!(event, InEvent::TimerExpired(_)) {
660 trace!(?event, "handle in_event");
661 } else {
662 debug!(?event, "handle in_event");
663 };
664 let out = self.state.handle(event, now, Some(&self.metrics));
665 for event in out {
666 if matches!(event, OutEvent::ScheduleTimer(_, _)) {
667 trace!(?event, "handle out_event");
668 } else {
669 debug!(?event, "handle out_event");
670 };
671 match event {
672 OutEvent::SendMessage(peer_id, message) => {
673 let state = self.peers.entry(peer_id).or_default();
674 match state {
675 PeerState::Active { active_send_tx, .. } => {
676 if let Err(_err) = active_send_tx.send(message).await {
677 warn!(
680 peer = %peer_id.fmt_short(),
681 "failed to send: connection task send loop terminated",
682 );
683 }
684 }
685 PeerState::Pending { queue } => {
686 if queue.is_empty() {
687 debug!(peer = %peer_id.fmt_short(), "start to dial");
688 self.dialer.queue_dial(peer_id, self.alpn.clone());
689 }
690 queue.push(message);
691 }
692 }
693 }
694 OutEvent::EmitEvent(topic_id, event) => {
695 let Some(state) = self.topics.get_mut(&topic_id) else {
696 warn!(?topic_id, "gossip state emitted event for unknown topic");
698 continue;
699 };
700 let TopicState {
701 neighbors,
702 event_sender,
703 ..
704 } = state;
705 match &event {
706 ProtoEvent::NeighborUp(neighbor) => {
707 neighbors.insert(*neighbor);
708 }
709 ProtoEvent::NeighborDown(neighbor) => {
710 neighbors.remove(neighbor);
711 }
712 _ => {}
713 }
714 event_sender.send(event).ok();
715 if !state.still_needed() {
716 self.quit_queue.push_back(topic_id);
717 }
718 }
719 OutEvent::ScheduleTimer(delay, timer) => {
720 self.timers.insert(now + delay, timer);
721 }
722 OutEvent::DisconnectPeer(peer_id) => {
723 debug!(peer=%peer_id.fmt_short(), "gossip state indicates disconnect: drop peer");
725 self.peers.remove(&peer_id);
726 }
727 OutEvent::PeerData(endpoint_id, data) => match decode_peer_data(&data) {
728 Err(err) => warn!("Failed to decode {data:?} from {endpoint_id}: {err}"),
729 Ok(info) => {
730 debug!(peer = ?endpoint_id, "add known addrs: {info:?}");
731 let mut endpoint_addr = EndpointAddr::new(endpoint_id);
732 for addr in info.direct_addresses {
733 endpoint_addr = endpoint_addr.with_ip_addr(addr);
734 }
735 if let Some(relay_url) = info.relay_url {
736 endpoint_addr = endpoint_addr.with_relay_url(relay_url);
737 }
738
739 self.address_lookup.add(endpoint_addr);
740 }
741 },
742 }
743 }
744 }
745}
746
747type ConnId = usize;
748
749#[derive(Debug)]
750enum PeerState {
751 Pending {
752 queue: Vec<ProtoMessage>,
753 },
754 Active {
755 active_send_tx: mpsc::Sender<ProtoMessage>,
756 active_conn_id: ConnId,
757 other_conns: Vec<ConnId>,
758 },
759}
760
761impl PeerState {
762 fn accept_conn(
763 &mut self,
764 send_tx: mpsc::Sender<ProtoMessage>,
765 conn_id: ConnId,
766 ) -> Vec<ProtoMessage> {
767 match self {
768 PeerState::Pending { queue } => {
769 let queue = std::mem::take(queue);
770 *self = PeerState::Active {
771 active_send_tx: send_tx,
772 active_conn_id: conn_id,
773 other_conns: Vec::new(),
774 };
775 queue
776 }
777 PeerState::Active {
778 active_send_tx,
779 active_conn_id,
780 other_conns,
781 } => {
782 other_conns.push(*active_conn_id);
788 *active_send_tx = send_tx;
789 *active_conn_id = conn_id;
790 Vec::new()
791 }
792 }
793 }
794}
795
796impl Default for PeerState {
797 fn default() -> Self {
798 PeerState::Pending { queue: Vec::new() }
799 }
800}
801
802#[derive(Debug)]
803struct TopicState {
804 neighbors: BTreeSet<EndpointId>,
805 event_sender: broadcast::Sender<ProtoEvent>,
806 command_rx_keys: HashSet<stream_group::Key>,
810}
811
812impl Default for TopicState {
813 fn default() -> Self {
814 let (event_sender, _) = broadcast::channel(TOPIC_EVENT_CAP);
815 Self {
816 neighbors: Default::default(),
817 command_rx_keys: Default::default(),
818 event_sender,
819 }
820 }
821}
822
823impl TopicState {
824 fn still_needed(&self) -> bool {
826 !self.command_rx_keys.is_empty() || self.event_sender.receiver_count() > 0
829 }
830
831 #[cfg(test)]
832 fn joined(&self) -> bool {
833 !self.neighbors.is_empty()
834 }
835}
836
837#[derive(Debug, Clone, Copy, PartialEq, Eq)]
839enum ConnOrigin {
840 Accept,
841 Dial,
842}
843
844#[allow(missing_docs)]
845#[stack_error(derive, add_meta, from_sources, std_sources)]
846#[non_exhaustive]
847enum ConnectionLoopError {
848 #[error(transparent)]
849 Write {
850 source: self::util::WriteError,
851 },
852 #[error(transparent)]
853 Read {
854 source: self::util::ReadError,
855 },
856 #[error(transparent)]
857 Connection {
858 #[error(std_err)]
859 source: iroh::endpoint::ConnectionError,
860 },
861 ActorDropped {},
862}
863
864impl<T> From<mpsc::error::SendError<T>> for ConnectionLoopError {
865 fn from(_value: mpsc::error::SendError<T>) -> Self {
866 e!(ConnectionLoopError::ActorDropped)
867 }
868}
869
870async fn connection_loop(
871 from: PublicKey,
872 conn: Connection,
873 origin: ConnOrigin,
874 send_rx: mpsc::Receiver<ProtoMessage>,
875 in_event_tx: mpsc::Sender<InEvent>,
876 max_message_size: usize,
877 queue: Vec<ProtoMessage>,
878) -> Result<(), ConnectionLoopError> {
879 debug!(?origin, "connection established");
880
881 let mut send_loop = SendLoop::new(conn.clone(), send_rx, max_message_size);
882 let mut recv_loop = RecvLoop::new(from, conn, in_event_tx, max_message_size);
883
884 let send_fut = send_loop.run(queue).instrument(error_span!("send"));
885 let recv_fut = recv_loop.run().instrument(error_span!("recv"));
886
887 let (send_res, recv_res) = tokio::join!(send_fut, recv_fut);
888 send_res?;
889 recv_res?;
890 Ok(())
891}
892
893#[derive(Default, Debug, Clone, Serialize, Deserialize)]
894struct AddrInfo {
895 relay_url: Option<RelayUrl>,
896 direct_addresses: BTreeSet<SocketAddr>,
897}
898
899impl From<EndpointAddr> for AddrInfo {
900 fn from(endpoint_addr: EndpointAddr) -> Self {
901 Self {
902 relay_url: endpoint_addr.relay_urls().next().cloned(),
903 direct_addresses: endpoint_addr.ip_addrs().cloned().collect(),
904 }
905 }
906}
907
908fn encode_peer_data(info: &AddrInfo) -> PeerData {
909 let bytes = postcard::to_stdvec(info).expect("serializing AddrInfo may not fail");
910 PeerData::new(bytes)
911}
912
913fn decode_peer_data(peer_data: &PeerData) -> Result<AddrInfo, postcard::Error> {
914 let bytes = peer_data.as_bytes();
915 if bytes.is_empty() {
916 return Ok(AddrInfo::default());
917 }
918 let info = postcard::from_bytes(bytes)?;
919 Ok(info)
920}
921
922async fn topic_subscriber_loop(
923 sender: irpc::channel::mpsc::Sender<Event>,
924 mut topic_events: broadcast::Receiver<ProtoEvent>,
925) {
926 loop {
927 tokio::select! {
928 biased;
929 msg = topic_events.recv() => {
930 let event = match msg {
931 Err(broadcast::error::RecvError::Closed) => break,
932 Err(broadcast::error::RecvError::Lagged(_)) => Event::Lagged,
933 Ok(event) => event.into(),
934 };
935 if sender.send(event).await.is_err() {
936 break;
937 }
938 }
939 _ = sender.closed() => break,
940 }
941 }
942}
943
944type BoxedCommandReceiver =
946 n0_future::stream::Boxed<Result<Command, irpc::channel::mpsc::RecvError>>;
947
948#[derive(derive_more::Debug)]
949struct TopicCommandStream {
950 topic_id: TopicId,
951 #[debug("CommandStream")]
952 stream: BoxedCommandReceiver,
953 closed: bool,
954}
955
956impl TopicCommandStream {
957 fn new(topic_id: TopicId, stream: BoxedCommandReceiver) -> Self {
958 Self {
959 topic_id,
960 stream,
961 closed: false,
962 }
963 }
964}
965
966impl Stream for TopicCommandStream {
967 type Item = (TopicId, Option<Command>);
968 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
969 if self.closed {
970 return Poll::Ready(None);
971 }
972 match Pin::new(&mut self.stream).poll_next(cx) {
973 Poll::Ready(Some(Ok(item))) => Poll::Ready(Some((self.topic_id, Some(item)))),
974 Poll::Ready(None) | Poll::Ready(Some(Err(_))) => {
975 self.closed = true;
976 Poll::Ready(Some((self.topic_id, None)))
977 }
978 Poll::Pending => Poll::Pending,
979 }
980 }
981}
982
983#[derive(Debug)]
984struct Dialer {
985 endpoint: Endpoint,
986 pending: JoinSet<(
987 EndpointId,
988 Option<Result<Connection, iroh::endpoint::ConnectError>>,
989 )>,
990 pending_dials: HashMap<EndpointId, CancellationToken>,
991}
992
993impl Dialer {
994 fn new(endpoint: Endpoint) -> Self {
996 Self {
997 endpoint,
998 pending: Default::default(),
999 pending_dials: Default::default(),
1000 }
1001 }
1002
1003 fn queue_dial(&mut self, endpoint_id: EndpointId, alpn: Bytes) {
1005 if self.is_pending(endpoint_id) {
1006 return;
1007 }
1008 let cancel = CancellationToken::new();
1009 self.pending_dials.insert(endpoint_id, cancel.clone());
1010 let endpoint = self.endpoint.clone();
1011 self.pending.spawn(
1012 async move {
1013 let res = tokio::select! {
1014 biased;
1015 _ = cancel.cancelled() => None,
1016 res = endpoint.connect(endpoint_id, &alpn) => Some(res),
1017 };
1018 (endpoint_id, res)
1019 }
1020 .instrument(tracing::Span::current()),
1021 );
1022 }
1023
1024 fn is_pending(&self, endpoint: EndpointId) -> bool {
1026 self.pending_dials.contains_key(&endpoint)
1027 }
1028
1029 async fn next_conn(
1032 &mut self,
1033 ) -> (
1034 EndpointId,
1035 Option<Result<Connection, iroh::endpoint::ConnectError>>,
1036 ) {
1037 match self.pending_dials.is_empty() {
1038 false => {
1039 let (endpoint_id, res) = loop {
1040 match self.pending.join_next().await {
1041 Some(Ok((endpoint_id, res))) => {
1042 self.pending_dials.remove(&endpoint_id);
1043 break (endpoint_id, res);
1044 }
1045 Some(Err(e)) => {
1046 error!("next conn error: {:?}", e);
1047 }
1048 None => {
1049 error!("no more pending conns available");
1050 std::future::pending().await
1051 }
1052 }
1053 };
1054
1055 (endpoint_id, res)
1056 }
1057 true => std::future::pending().await,
1058 }
1059 }
1060}
1061
1062#[cfg(test)]
1063pub(crate) mod test {
1064 use std::time::Duration;
1065
1066 use bytes::Bytes;
1067 use futures_concurrency::future::TryJoin;
1068 use iroh::{
1069 address_lookup::memory::MemoryLookup, endpoint::BindError, protocol::Router, RelayMap,
1070 RelayMode, SecretKey,
1071 };
1072 use n0_error::{AnyError, Result, StdResultExt};
1073 use n0_tracing_test::traced_test;
1074 use rand::{CryptoRng, Rng};
1075 use tokio::{spawn, time::timeout};
1076 use tokio_util::sync::CancellationToken;
1077 use tracing::{info, instrument};
1078
1079 use super::*;
1080 use crate::api::{ApiError, GossipReceiver, GossipSender};
1081
1082 struct ManualActorLoop {
1083 actor: Actor,
1084 step: usize,
1085 }
1086
1087 impl std::ops::Deref for ManualActorLoop {
1088 type Target = Actor;
1089
1090 fn deref(&self) -> &Self::Target {
1091 &self.actor
1092 }
1093 }
1094
1095 impl std::ops::DerefMut for ManualActorLoop {
1096 fn deref_mut(&mut self) -> &mut Self::Target {
1097 &mut self.actor
1098 }
1099 }
1100
1101 type EndpointHandle = tokio::task::JoinHandle<Result<()>>;
1102
1103 impl ManualActorLoop {
1104 #[instrument(skip_all, fields(me = %actor.endpoint.id().fmt_short()))]
1105 async fn new(mut actor: Actor) -> Self {
1106 let _ = actor.setup().await;
1107 Self { actor, step: 0 }
1108 }
1109
1110 #[instrument(skip_all, fields(me = %self.endpoint.id().fmt_short()))]
1111 async fn step(&mut self) -> bool {
1112 let ManualActorLoop { actor, step } = self;
1113 *step += 1;
1114 let addr_update_stream = &mut futures_lite::stream::pending();
1117 actor.event_loop(addr_update_stream, *step).await
1118 }
1119
1120 async fn steps(&mut self, n: usize) {
1121 for _ in 0..n {
1122 self.step().await;
1123 }
1124 }
1125
1126 async fn finish(mut self) {
1127 while self.step().await {}
1128 }
1129 }
1130
1131 impl Gossip {
1132 async fn t_new_with_actor(
1139 rng: &mut rand_chacha::ChaCha12Rng,
1140 config: proto::Config,
1141 relay_map: RelayMap,
1142 cancel: &CancellationToken,
1143 ) -> Result<(Self, Actor, EndpointHandle), BindError> {
1144 let endpoint = create_endpoint(rng, relay_map, None).await?;
1145 let metrics = Arc::new(Metrics::default());
1146 let address_lookup = GossipAddressLookup::default();
1147 endpoint.address_lookup().add(address_lookup.clone());
1148
1149 let (actor, to_actor_tx, conn_tx) =
1150 Actor::new(endpoint, config, metrics.clone(), None, address_lookup);
1151 let max_message_size = actor.state.max_message_size();
1152
1153 let _actor_handle =
1154 AbortOnDropHandle::new(task::spawn(futures_lite::future::pending()));
1155 let gossip = Self {
1156 inner: Inner {
1157 api: GossipApi::local(to_actor_tx),
1158 local_tx: conn_tx,
1159 _actor_handle,
1160 max_message_size,
1161 metrics,
1162 }
1163 .into(),
1164 };
1165
1166 let endpoint_task = task::spawn(endpoint_loop(
1167 actor.endpoint.clone(),
1168 gossip.clone(),
1169 cancel.child_token(),
1170 ));
1171
1172 Ok((gossip, actor, endpoint_task))
1173 }
1174
1175 async fn t_new(
1177 rng: &mut rand_chacha::ChaCha12Rng,
1178 config: proto::Config,
1179 relay_map: RelayMap,
1180 cancel: &CancellationToken,
1181 ) -> Result<(Self, Endpoint, EndpointHandle, impl Drop + use<>), BindError> {
1182 let (g, actor, ep_handle) =
1183 Gossip::t_new_with_actor(rng, config, relay_map, cancel).await?;
1184 let ep = actor.endpoint.clone();
1185 let me = ep.id().fmt_short();
1186 let actor_handle =
1187 task::spawn(actor.run().instrument(tracing::error_span!("gossip", %me)));
1188 Ok((g, ep, ep_handle, AbortOnDropHandle::new(actor_handle)))
1189 }
1190 }
1191
1192 pub(crate) async fn create_endpoint(
1193 rng: &mut rand_chacha::ChaCha12Rng,
1194 relay_map: RelayMap,
1195 memory_lookup: Option<MemoryLookup>,
1196 ) -> Result<Endpoint, BindError> {
1197 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1198 .secret_key(SecretKey::generate(rng))
1199 .alpns(vec![GOSSIP_ALPN.to_vec()])
1200 .insecure_skip_relay_cert_verify(true)
1201 .bind()
1202 .await?;
1203
1204 if let Some(memory_lookup) = memory_lookup {
1205 ep.address_lookup().add(memory_lookup);
1206 }
1207 ep.online().await;
1208 Ok(ep)
1209 }
1210
1211 async fn endpoint_loop(
1212 endpoint: Endpoint,
1213 gossip: Gossip,
1214 cancel: CancellationToken,
1215 ) -> Result<()> {
1216 loop {
1217 tokio::select! {
1218 biased;
1219 _ = cancel.cancelled() => break,
1220 incoming = endpoint.accept() => match incoming {
1221 None => break,
1222 Some(incoming) => {
1223 let connecting = match incoming.accept() {
1224 Ok(connecting) => connecting,
1225 Err(err) => {
1226 warn!("incoming connection failed: {err:#}");
1227 continue;
1230 }
1231 };
1232 let connection = connecting
1233 .await
1234 .std_context("await incoming connection")?;
1235 gossip.handle_connection(connection).await?
1236 }
1237 }
1238 }
1239 }
1240 Ok(())
1241 }
1242
1243 #[tokio::test]
1244 #[traced_test]
1245 async fn gossip_net_smoke() {
1246 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1247 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1248
1249 let memory_lookup = MemoryLookup::new();
1250
1251 let ep1 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1252 .await
1253 .unwrap();
1254 let ep2 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1255 .await
1256 .unwrap();
1257 let ep3 = create_endpoint(&mut rng, relay_map.clone(), Some(memory_lookup.clone()))
1258 .await
1259 .unwrap();
1260
1261 let go1 = Gossip::builder().spawn(ep1.clone());
1262 let go2 = Gossip::builder().spawn(ep2.clone());
1263 let go3 = Gossip::builder().spawn(ep3.clone());
1264 debug!("peer1 {:?}", ep1.id());
1265 debug!("peer2 {:?}", ep2.id());
1266 debug!("peer3 {:?}", ep3.id());
1267 let pi1 = ep1.id();
1268 let pi2 = ep2.id();
1269
1270 let cancel = CancellationToken::new();
1271 let tasks = [
1272 spawn(endpoint_loop(ep1.clone(), go1.clone(), cancel.clone())),
1273 spawn(endpoint_loop(ep2.clone(), go2.clone(), cancel.clone())),
1274 spawn(endpoint_loop(ep3.clone(), go3.clone(), cancel.clone())),
1275 ];
1276
1277 debug!("----- adding peers ----- ");
1278 let topic: TopicId = blake3::hash(b"foobar").into();
1279
1280 let addr1 = EndpointAddr::new(pi1).with_relay_url(relay_url.clone());
1281 let addr2 = EndpointAddr::new(pi2).with_relay_url(relay_url);
1282 memory_lookup.add_endpoint_info(addr1.clone());
1283 memory_lookup.add_endpoint_info(addr2.clone());
1284
1285 debug!("----- joining ----- ");
1286 let [sub1, mut sub2, mut sub3] = [
1288 go1.subscribe_and_join(topic, vec![]),
1289 go2.subscribe_and_join(topic, vec![pi1]),
1290 go3.subscribe_and_join(topic, vec![pi2]),
1291 ]
1292 .try_join()
1293 .await
1294 .unwrap();
1295
1296 let (sink1, _stream1) = sub1.split();
1297
1298 let len = 2;
1299
1300 let pub1 = spawn(async move {
1302 for i in 0..len {
1303 let message = format!("hi{i}");
1304 info!("go1 broadcast: {message:?}");
1305 sink1.broadcast(message.into_bytes().into()).await.unwrap();
1306 tokio::time::sleep(Duration::from_micros(1)).await;
1307 }
1308 });
1309
1310 let sub2 = spawn(async move {
1312 let mut recv = vec![];
1313 loop {
1314 let ev = sub2.next().await.unwrap().unwrap();
1315 info!("go2 event: {ev:?}");
1316 if let Event::Received(msg) = ev {
1317 recv.push(msg.content);
1318 }
1319 if recv.len() == len {
1320 return recv;
1321 }
1322 }
1323 });
1324
1325 let sub3 = spawn(async move {
1327 let mut recv = vec![];
1328 loop {
1329 let ev = sub3.next().await.unwrap().unwrap();
1330 info!("go3 event: {ev:?}");
1331 if let Event::Received(msg) = ev {
1332 recv.push(msg.content);
1333 }
1334 if recv.len() == len {
1335 return recv;
1336 }
1337 }
1338 });
1339
1340 timeout(Duration::from_secs(10), pub1)
1341 .await
1342 .unwrap()
1343 .unwrap();
1344 let recv2 = timeout(Duration::from_secs(10), sub2)
1345 .await
1346 .unwrap()
1347 .unwrap();
1348 let recv3 = timeout(Duration::from_secs(10), sub3)
1349 .await
1350 .unwrap()
1351 .unwrap();
1352
1353 let expected: HashSet<Bytes> = (0..len)
1358 .map(|i| Bytes::from(format!("hi{i}").into_bytes()))
1359 .collect();
1360 assert_eq!(HashSet::from_iter(recv2), expected);
1361 assert_eq!(HashSet::from_iter(recv3), expected);
1362
1363 cancel.cancel();
1364 for t in tasks {
1365 timeout(Duration::from_secs(10), t)
1366 .await
1367 .unwrap()
1368 .unwrap()
1369 .unwrap();
1370 }
1371 }
1372
1373 #[tokio::test]
1383 #[traced_test]
1384 async fn subscription_cleanup() -> Result {
1385 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1386 let ct = CancellationToken::new();
1387 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1388
1389 let (go1, actor, ep1_handle) =
1391 Gossip::t_new_with_actor(rng, Default::default(), relay_map.clone(), &ct).await?;
1392 let mut actor = ManualActorLoop::new(actor).await;
1393
1394 let (go2, ep2, ep2_handle, _test_actor_handle) =
1396 Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1397
1398 let endpoint_id1 = actor.endpoint.id();
1399 let endpoint_id2 = ep2.id();
1400 tracing::info!(
1401 endpoint_1 = %endpoint_id1.fmt_short(),
1402 endpoint_2 = %endpoint_id2.fmt_short(),
1403 "endpoints ready"
1404 );
1405
1406 let topic: TopicId = blake3::hash(b"subscription_cleanup").into();
1407 tracing::info!(%topic, "joining");
1408
1409 let ct2 = ct.clone();
1416 let go2_task = async move {
1417 let (_pub_tx, mut sub_rx) = go2.subscribe_and_join(topic, vec![]).await?.split();
1418
1419 let subscribe_fut = async {
1420 while let Some(ev) = sub_rx.try_next().await? {
1421 match ev {
1422 Event::Lagged => tracing::debug!("missed some messages :("),
1423 Event::Received(_) => unreachable!("test does not send messages"),
1424 other => tracing::debug!(?other, "gs event"),
1425 }
1426 }
1427
1428 tracing::debug!("subscribe stream ended");
1429 Ok::<_, AnyError>(())
1430 };
1431
1432 tokio::select! {
1433 _ = ct2.cancelled() => Ok(()),
1434 res = subscribe_fut => res,
1435 }
1436 }
1437 .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1438 let go2_handle = task::spawn(go2_task);
1439
1440 let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1442 let memory_lookup = MemoryLookup::new();
1443 memory_lookup.add_endpoint_info(addr2);
1444 actor.endpoint.address_lookup().add(memory_lookup);
1445 let (tx, mut rx) = mpsc::channel::<()>(1);
1447 let ct1 = ct.clone();
1448 let go1_task = async move {
1449 tracing::info!("subscribing the first time");
1451 let sub_1a = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1452
1453 rx.recv().await.expect("signal for second subscribe");
1455 tracing::info!("subscribing a second time");
1456 let sub_1b = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1457 drop(sub_1a);
1458
1459 rx.recv().await.expect("signal for second subscribe");
1461 tracing::info!("dropping all handles");
1462 drop(sub_1b);
1463
1464 ct1.cancelled().await;
1466 drop(go1);
1467
1468 Ok::<_, AnyError>(())
1469 }
1470 .instrument(tracing::debug_span!("endpoint_1", %endpoint_id1));
1471 let go1_handle = task::spawn(go1_task);
1472
1473 actor.steps(3).await; let state = actor.topics.get(&topic).expect("get registered topic");
1478 assert!(state.joined());
1479
1480 tx.send(())
1482 .await
1483 .std_context("signal additional subscribe")?;
1484 actor.steps(3).await; let state = actor.topics.get(&topic).expect("get registered topic");
1486 assert!(state.joined());
1487
1488 tx.send(()).await.std_context("signal drop handles")?;
1490 actor.steps(2).await; assert!(!actor.topics.contains_key(&topic));
1492
1493 ct.cancel();
1495 let wait = Duration::from_secs(2);
1496 timeout(wait, ep1_handle)
1497 .await
1498 .std_context("wait endpoint1 task")?
1499 .std_context("join endpoint1 task")??;
1500 timeout(wait, ep2_handle)
1501 .await
1502 .std_context("wait endpoint2 task")?
1503 .std_context("join endpoint2 task")??;
1504 timeout(wait, go1_handle)
1505 .await
1506 .std_context("wait gossip1 task")?
1507 .std_context("join gossip1 task")??;
1508 timeout(wait, go2_handle)
1509 .await
1510 .std_context("wait gossip2 task")?
1511 .std_context("join gossip2 task")??;
1512 timeout(wait, actor.finish())
1513 .await
1514 .std_context("wait actor finish")?;
1515
1516 Ok(())
1517 }
1518
1519 #[tokio::test]
1526 #[traced_test]
1527 async fn can_reconnect() -> Result {
1528 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1529 let ct = CancellationToken::new();
1530 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1531
1532 let (go1, ep1, ep1_handle, _test_actor_handle1) =
1533 Gossip::t_new(rng, Default::default(), relay_map.clone(), &ct).await?;
1534
1535 let (go2, ep2, ep2_handle, _test_actor_handle2) =
1536 Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1537
1538 let endpoint_id1 = ep1.id();
1539 let endpoint_id2 = ep2.id();
1540 tracing::info!(
1541 endpoint_1 = %endpoint_id1.fmt_short(),
1542 endpoint_2 = %endpoint_id2.fmt_short(),
1543 "endpoints ready"
1544 );
1545
1546 let topic: TopicId = blake3::hash(b"can_reconnect").into();
1547 tracing::info!(%topic, "joining");
1548
1549 let ct2 = ct.child_token();
1550 let (tx, mut rx) = mpsc::channel::<()>(1);
1552 let addr1 = EndpointAddr::new(endpoint_id1).with_relay_url(relay_url.clone());
1553 let memory_lookup = MemoryLookup::new();
1554 memory_lookup.add_endpoint_info(addr1);
1555 ep2.address_lookup().add(memory_lookup.clone());
1556 let go2_task = async move {
1557 let mut sub = go2.subscribe(topic, Vec::new()).await?;
1558 sub.joined().await?;
1559
1560 rx.recv().await.expect("signal to unsubscribe");
1561 tracing::info!("unsubscribing");
1562 drop(sub);
1563
1564 rx.recv().await.expect("signal to subscribe again");
1565 tracing::info!("resubscribing");
1566 let mut sub = go2.subscribe(topic, vec![endpoint_id1]).await?;
1567
1568 sub.joined().await?;
1569 tracing::info!("subscription successful!");
1570
1571 ct2.cancelled().await;
1572
1573 Ok::<_, ApiError>(())
1574 }
1575 .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1576 let go2_handle = task::spawn(go2_task);
1577
1578 let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1579 memory_lookup.add_endpoint_info(addr2);
1580 ep1.address_lookup().add(memory_lookup);
1581
1582 let mut sub = go1.subscribe(topic, vec![endpoint_id2]).await?;
1583 sub.joined().await?;
1585
1586 tx.send(()).await.std_context("signal unsubscribe")?;
1588
1589 let conn_timeout = Duration::from_millis(500);
1591 let ev = timeout(conn_timeout, sub.try_next())
1592 .await
1593 .std_context("wait neighbor down")??;
1594 assert_eq!(ev, Some(Event::NeighborDown(endpoint_id2)));
1595 tracing::info!("endpoint 2 left");
1596
1597 tx.send(()).await.std_context("signal resubscribe")?;
1599
1600 let conn_timeout = Duration::from_millis(500);
1601 let ev = timeout(conn_timeout, sub.try_next())
1602 .await
1603 .std_context("wait neighbor up")??;
1604 assert_eq!(ev, Some(Event::NeighborUp(endpoint_id2)));
1605 tracing::info!("endpoint 2 rejoined!");
1606
1607 ct.cancel();
1609 let wait = Duration::from_secs(2);
1610 timeout(wait, ep1_handle)
1611 .await
1612 .std_context("wait endpoint1 task")?
1613 .std_context("join endpoint1 task")??;
1614 timeout(wait, ep2_handle)
1615 .await
1616 .std_context("wait endpoint2 task")?
1617 .std_context("join endpoint2 task")??;
1618 timeout(wait, go2_handle)
1619 .await
1620 .std_context("wait gossip2 task")?
1621 .std_context("join gossip2 task")??;
1622
1623 Result::Ok(())
1624 }
1625
1626 #[tokio::test]
1627 #[traced_test]
1628 async fn can_die_and_reconnect() -> Result {
1629 fn run_in_thread<T: Send + 'static>(
1632 cancel: CancellationToken,
1633 fut: impl std::future::Future<Output = T> + Send + 'static,
1634 ) -> std::thread::JoinHandle<Option<T>> {
1635 std::thread::spawn(move || {
1636 let rt = tokio::runtime::Builder::new_current_thread()
1637 .enable_all()
1638 .build()
1639 .unwrap();
1640 rt.block_on(async move { cancel.run_until_cancelled(fut).await })
1641 })
1642 }
1643
1644 async fn spawn_gossip(
1646 secret_key: SecretKey,
1647 relay_map: RelayMap,
1648 ) -> Result<(Router, Gossip), BindError> {
1649 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1650 .secret_key(secret_key)
1651 .insecure_skip_relay_cert_verify(true)
1652 .bind()
1653 .await?;
1654 let gossip = Gossip::builder().spawn(ep.clone());
1655 let router = Router::builder(ep)
1656 .accept(GOSSIP_ALPN, gossip.clone())
1657 .spawn();
1658 Ok((router, gossip))
1659 }
1660
1661 async fn broadcast_once(
1663 secret_key: SecretKey,
1664 relay_map: RelayMap,
1665 bootstrap_addr: EndpointAddr,
1666 topic_id: TopicId,
1667 message: String,
1668 ) -> Result {
1669 let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1670 info!(endpoint_id = %router.endpoint().id().fmt_short(), "broadcast endpoint spawned");
1671 let bootstrap = vec![bootstrap_addr.id];
1672 let memory_lookup = MemoryLookup::new();
1673 memory_lookup.add_endpoint_info(bootstrap_addr);
1674 router.endpoint().address_lookup().add(memory_lookup);
1675 let mut topic = gossip.subscribe_and_join(topic_id, bootstrap).await?;
1676 topic.broadcast(message.as_bytes().to_vec().into()).await?;
1677 std::future::pending::<()>().await;
1678 Ok(())
1679 }
1680
1681 let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1682 let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1683 let topic_id = TopicId::from_bytes(rng.random());
1684
1685 let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
1688 let (msgs_recv_tx, mut msgs_recv_rx) = tokio::sync::mpsc::channel(3);
1689 let recv_task = tokio::task::spawn({
1690 let relay_map = relay_map.clone();
1691 let secret_key = SecretKey::generate(&mut rng);
1692 async move {
1693 let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1694 router.endpoint().online().await;
1699 let addr = router.endpoint().addr();
1700 info!(endpoint_id = %addr.id.fmt_short(), "recv endpoint spawned");
1701 addr_tx.send(addr).unwrap();
1702 let mut topic = gossip.subscribe_and_join(topic_id, vec![]).await?;
1703 while let Some(event) = topic.try_next().await.unwrap() {
1704 if let Event::Received(message) = event {
1705 let message = std::str::from_utf8(&message.content)
1706 .std_context("decode broadcast message")?
1707 .to_string();
1708 msgs_recv_tx
1709 .send(message)
1710 .await
1711 .std_context("forward received message")?;
1712 }
1713 }
1714 Ok::<_, AnyError>(())
1715 }
1716 });
1717
1718 let endpoint0_addr = addr_rx.await.std_context("receive endpoint address")?;
1719 let max_wait = Duration::from_secs(5);
1720
1721 let cancel = CancellationToken::new();
1724 let secret = SecretKey::generate(&mut rng);
1725 let join_handle_1 = run_in_thread(
1726 cancel.clone(),
1727 broadcast_once(
1728 secret.clone(),
1729 relay_map.clone(),
1730 endpoint0_addr.clone(),
1731 topic_id,
1732 "msg1".to_string(),
1733 ),
1734 );
1735 let msg = timeout(max_wait, msgs_recv_rx.recv())
1737 .await
1738 .std_context("wait for first broadcast")?
1739 .std_context("receiver dropped channel")?;
1740 assert_eq!(&msg, "msg1");
1741 info!("kill broadcast endpoint");
1742 cancel.cancel();
1743
1744 let cancel = CancellationToken::new();
1746 let join_handle_2 = run_in_thread(
1747 cancel.clone(),
1748 broadcast_once(
1749 secret.clone(),
1750 relay_map.clone(),
1751 endpoint0_addr.clone(),
1752 topic_id,
1753 "msg2".to_string(),
1754 ),
1755 );
1756 let msg = timeout(max_wait, msgs_recv_rx.recv())
1759 .await
1760 .std_context("wait for second broadcast")?
1761 .std_context("receiver dropped channel")?;
1762 assert_eq!(&msg, "msg2");
1763 info!("kill broadcast endpoint");
1764 cancel.cancel();
1765
1766 info!("kill recv endpoint");
1767 recv_task.abort();
1768 assert!(join_handle_1.join().unwrap().is_none());
1769 assert!(join_handle_2.join().unwrap().is_none());
1770
1771 Ok(())
1772 }
1773
1774 #[tokio::test]
1775 #[traced_test]
1776 async fn gossip_change_alpn() -> n0_error::Result<()> {
1777 let alpn = b"my-gossip-alpn";
1778 let topic_id = TopicId::from([0u8; 32]);
1779
1780 let ep1 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1781 let ep2 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1782 let gossip1 = Gossip::builder().alpn(alpn).spawn(ep1.clone());
1783 let gossip2 = Gossip::builder().alpn(alpn).spawn(ep2.clone());
1784 let router1 = Router::builder(ep1).accept(alpn, gossip1.clone()).spawn();
1785 let router2 = Router::builder(ep2).accept(alpn, gossip2.clone()).spawn();
1786
1787 let addr1 = router1.endpoint().addr();
1788 let id1 = addr1.id;
1789 let memory_lookup = MemoryLookup::new();
1790 memory_lookup.add_endpoint_info(addr1);
1791 router2.endpoint().address_lookup().add(memory_lookup);
1792
1793 let mut topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1794 let mut topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1795
1796 timeout(Duration::from_secs(3), topic1.joined())
1797 .await
1798 .std_context("wait topic1 join")??;
1799 timeout(Duration::from_secs(3), topic2.joined())
1800 .await
1801 .std_context("wait topic2 join")??;
1802 router1.shutdown().await.std_context("shutdown router1")?;
1803 router2.shutdown().await.std_context("shutdown router2")?;
1804 Ok(())
1805 }
1806
1807 #[tokio::test]
1808 #[traced_test]
1809 async fn gossip_rely_on_gossip_address_lookup() -> n0_error::Result<()> {
1810 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1811
1812 async fn spawn(
1813 rng: &mut impl CryptoRng,
1814 ) -> n0_error::Result<(EndpointId, Router, Gossip, GossipSender, GossipReceiver)> {
1815 let topic_id = TopicId::from([0u8; 32]);
1816 let ep = Endpoint::empty_builder(RelayMode::Disabled)
1817 .secret_key(SecretKey::generate(rng))
1818 .bind()
1819 .await?;
1820 let endpoint_id = ep.id();
1821 let gossip = Gossip::builder().spawn(ep.clone());
1822 let router = Router::builder(ep)
1823 .accept(GOSSIP_ALPN, gossip.clone())
1824 .spawn();
1825 let topic = gossip.subscribe(topic_id, vec![]).await?;
1826 let (sender, receiver) = topic.split();
1827 Ok((endpoint_id, router, gossip, sender, receiver))
1828 }
1829
1830 let (n1, r1, _g1, _tx1, mut rx1) = spawn(rng).await?;
1832 let (n2, r2, _g2, tx2, mut rx2) = spawn(rng).await?;
1833 let (n3, r3, _g3, tx3, mut rx3) = spawn(rng).await?;
1834
1835 println!("endpoints {:?}", [n1, n2, n3]);
1836
1837 let addr1 = r1.endpoint().addr();
1839 let lookup = MemoryLookup::new();
1840 lookup.add_endpoint_info(addr1);
1841
1842 r2.endpoint().address_lookup().add(lookup.clone());
1844 tx2.join_peers(vec![n1]).await?;
1845
1846 timeout(Duration::from_secs(3), rx1.joined())
1848 .await
1849 .std_context("wait rx1 join")??;
1850 timeout(Duration::from_secs(3), rx2.joined())
1851 .await
1852 .std_context("wait rx2 join")??;
1853
1854 r3.endpoint().address_lookup().add(lookup.clone());
1856 tx3.join_peers(vec![n1]).await?;
1857
1858 let ev = timeout(Duration::from_secs(3), rx3.next())
1861 .await
1862 .std_context("wait rx3 first neighbor")?;
1863 assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1864 let ev = timeout(Duration::from_secs(3), rx3.next())
1865 .await
1866 .std_context("wait rx3 second neighbor")?;
1867 assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1868
1869 assert_eq!(sorted(rx3.neighbors()), sorted([n1, n2]));
1870
1871 let ev = timeout(Duration::from_secs(3), rx2.next())
1872 .await
1873 .std_context("wait rx2 neighbor")?;
1874 assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1875
1876 let ev = timeout(Duration::from_secs(3), rx1.next())
1877 .await
1878 .std_context("wait rx1 neighbor")?;
1879 assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1880
1881 tokio::try_join!(r1.shutdown(), r2.shutdown(), r3.shutdown())
1882 .std_context("shutdown routers")?;
1883 Ok(())
1884 }
1885
1886 fn sorted<T: Ord>(input: impl IntoIterator<Item = T>) -> Vec<T> {
1887 let mut out: Vec<_> = input.into_iter().collect();
1888 out.sort();
1889 out
1890 }
1891
1892 #[tokio::test]
1898 #[traced_test]
1899 async fn topic_stays_alive_after_sender_drop() -> n0_error::Result<()> {
1900 let topic_id = TopicId::from([99u8; 32]);
1901
1902 let ep1 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1903 let ep2 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1904 let gossip1 = Gossip::builder().spawn(ep1.clone());
1905 let gossip2 = Gossip::builder().spawn(ep2.clone());
1906 let router1 = Router::builder(ep1)
1907 .accept(crate::ALPN, gossip1.clone())
1908 .spawn();
1909 let router2 = Router::builder(ep2)
1910 .accept(crate::ALPN, gossip2.clone())
1911 .spawn();
1912
1913 let addr1 = router1.endpoint().addr();
1914 let id1 = addr1.id;
1915 let mem_lookup = MemoryLookup::new();
1916 mem_lookup.add_endpoint_info(addr1);
1917 router2.endpoint().address_lookup().add(mem_lookup);
1918
1919 let topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1920 let topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1921
1922 let (tx1, mut rx1) = topic1.split();
1923 let (tx2, mut rx2) = topic2.split();
1924
1925 timeout(Duration::from_secs(3), rx1.joined())
1927 .await
1928 .std_context("wait rx1 join")??;
1929 timeout(Duration::from_secs(3), rx2.joined())
1930 .await
1931 .std_context("wait rx2 join")??;
1932
1933 drop(tx1);
1935
1936 tx2.broadcast(b"hello from node2".to_vec().into()).await?;
1938
1939 let event = timeout(Duration::from_secs(3), rx1.next())
1941 .await
1942 .std_context("wait for message on rx1")?;
1943
1944 match event {
1945 Some(Ok(Event::Received(msg))) => {
1946 assert_eq!(&msg.content[..], b"hello from node2");
1947 }
1948 other => panic!("expected Received event, got {:?}", other),
1949 }
1950
1951 drop(tx2);
1952 drop(rx1);
1953 drop(rx2);
1954 router1.shutdown().await.std_context("shutdown router1")?;
1955 router2.shutdown().await.std_context("shutdown router2")?;
1956 Ok(())
1957 }
1958}