1use std::{
4 collections::{hash_map::Entry, BTreeSet, HashMap, HashSet, VecDeque},
5 net::SocketAddr,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11use bytes::Bytes;
12use futures_concurrency::stream::{stream_group, StreamGroup};
13use futures_util::FutureExt as _;
14use iroh::{
15 endpoint::Connection,
16 protocol::{AcceptError, ProtocolHandler},
17 Endpoint, EndpointAddr, EndpointId, PublicKey, RelayUrl, Watcher,
18};
19use irpc::WithChannels;
20use n0_error::{e, stack_error};
21use n0_future::{
22 task::{self, AbortOnDropHandle, JoinSet},
23 time::Instant,
24 Stream, StreamExt as _,
25};
26use rand::{rngs::StdRng, SeedableRng};
27use serde::{Deserialize, Serialize};
28use tokio::sync::{broadcast, mpsc, oneshot};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, error_span, trace, warn, Instrument};
31
32use self::{
33 discovery::GossipDiscovery,
34 util::{RecvLoop, SendLoop, Timers},
35};
36use crate::{
37 api::{self, Command, Event, GossipApi, RpcMessage},
38 metrics::Metrics,
39 proto::{self, HyparviewConfig, PeerData, PlumtreeConfig, Scope, TopicId},
40};
41
42mod discovery;
43mod util;
44
45pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/1";
47
48const SEND_QUEUE_CAP: usize = 64;
50const TO_ACTOR_CAP: usize = 64;
52const IN_EVENT_CAP: usize = 1024;
54const TOPIC_EVENT_CAP: usize = 256;
56
57pub type ProtoEvent = proto::Event<PublicKey>;
59pub type ProtoCommand = proto::Command<PublicKey>;
61
62type InEvent = proto::InEvent<PublicKey>;
63type OutEvent = proto::OutEvent<PublicKey>;
64type Timer = proto::Timer<PublicKey>;
65type ProtoMessage = proto::Message<PublicKey>;
66
67#[derive(Debug, Clone)]
86pub struct Gossip {
87 pub(crate) inner: Arc<Inner>,
88}
89
90impl std::ops::Deref for Gossip {
91 type Target = GossipApi;
92 fn deref(&self) -> &Self::Target {
93 &self.inner.api
94 }
95}
96
97#[derive(Debug)]
98enum LocalActorMessage {
99 HandleConnection(Connection),
100 Shutdown { reply: oneshot::Sender<()> },
101}
102
103#[allow(missing_docs)]
104#[stack_error(derive, add_meta)]
105#[non_exhaustive]
106pub enum Error {
107 ActorDropped {},
108}
109
110impl<T> From<mpsc::error::SendError<T>> for Error {
111 fn from(_value: mpsc::error::SendError<T>) -> Self {
112 e!(Error::ActorDropped)
113 }
114}
115impl From<oneshot::error::RecvError> for Error {
116 fn from(_value: oneshot::error::RecvError) -> Self {
117 e!(Error::ActorDropped)
118 }
119}
120
121#[derive(Debug)]
122pub(crate) struct Inner {
123 api: GossipApi,
124 local_tx: mpsc::Sender<LocalActorMessage>,
125 _actor_handle: AbortOnDropHandle<()>,
126 max_message_size: usize,
127 metrics: Arc<Metrics>,
128}
129
130impl ProtocolHandler for Gossip {
131 async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
132 self.handle_connection(connection)
133 .await
134 .map_err(AcceptError::from_err)?;
135 Ok(())
136 }
137
138 async fn shutdown(&self) {
139 if let Err(err) = self.shutdown().await {
140 warn!("error while shutting down gossip: {err:#}");
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
147pub struct Builder {
148 config: proto::Config,
149 alpn: Option<Bytes>,
150}
151
152impl Builder {
153 pub fn max_message_size(mut self, size: usize) -> Self {
156 self.config.max_message_size = size;
157 self
158 }
159
160 pub fn membership_config(mut self, config: HyparviewConfig) -> Self {
162 self.config.membership = config;
163 self
164 }
165
166 pub fn broadcast_config(mut self, config: PlumtreeConfig) -> Self {
168 self.config.broadcast = config;
169 self
170 }
171
172 pub fn alpn(mut self, alpn: impl AsRef<[u8]>) -> Self {
180 self.alpn = Some(alpn.as_ref().to_vec().into());
181 self
182 }
183
184 pub fn spawn(self, endpoint: Endpoint) -> Gossip {
186 let metrics = Arc::new(Metrics::default());
187 let discovery = GossipDiscovery::default();
188 endpoint.discovery().add(discovery.clone());
189 let (actor, rpc_tx, local_tx) =
190 Actor::new(endpoint, self.config, metrics.clone(), self.alpn, discovery);
191 let me = actor.endpoint.id().fmt_short();
192 let max_message_size = actor.state.max_message_size();
193
194 let actor_handle = task::spawn(actor.run().instrument(error_span!("gossip", %me)));
195
196 let api = GossipApi::local(rpc_tx);
197
198 Gossip {
199 inner: Inner {
200 api,
201 local_tx,
202 _actor_handle: AbortOnDropHandle::new(actor_handle),
203 max_message_size,
204 metrics,
205 }
206 .into(),
207 }
208 }
209}
210
211impl Gossip {
212 pub fn builder() -> Builder {
214 Builder {
215 config: Default::default(),
216 alpn: None,
217 }
218 }
219
220 #[cfg(feature = "rpc")]
222 pub async fn listen(self, endpoint: quinn::Endpoint) {
223 self.inner.api.listen(endpoint).await
224 }
225
226 pub fn max_message_size(&self) -> usize {
228 self.inner.max_message_size
229 }
230
231 pub async fn handle_connection(&self, conn: Connection) -> Result<(), Error> {
235 self.inner
236 .local_tx
237 .send(LocalActorMessage::HandleConnection(conn))
238 .await?;
239 Ok(())
240 }
241
242 pub async fn shutdown(&self) -> Result<(), Error> {
247 let (reply, reply_rx) = oneshot::channel();
248 self.inner
249 .local_tx
250 .send(LocalActorMessage::Shutdown { reply })
251 .await?;
252 reply_rx.await?;
253 Ok(())
254 }
255
256 pub fn metrics(&self) -> &Arc<Metrics> {
258 &self.inner.metrics
259 }
260}
261
262struct Actor {
264 alpn: Bytes,
265 state: proto::State<PublicKey, StdRng>,
267 endpoint: Endpoint,
269 dialer: Dialer,
271 rpc_rx: mpsc::Receiver<RpcMessage>,
273 local_rx: mpsc::Receiver<LocalActorMessage>,
274 in_event_tx: mpsc::Sender<InEvent>,
276 in_event_rx: mpsc::Receiver<InEvent>,
278 timers: Timers<Timer>,
280 topics: HashMap<TopicId, TopicState>,
282 peers: HashMap<EndpointId, PeerState>,
284 command_rx: stream_group::Keyed<TopicCommandStream>,
286 quit_queue: VecDeque<TopicId>,
288 connection_tasks: JoinSet<(EndpointId, Connection, Result<(), ConnectionLoopError>)>,
290 metrics: Arc<Metrics>,
291 topic_event_forwarders: JoinSet<TopicId>,
292 discovery: GossipDiscovery,
293}
294
295impl Actor {
296 fn new(
297 endpoint: Endpoint,
298 config: proto::Config,
299 metrics: Arc<Metrics>,
300 alpn: Option<Bytes>,
301 discovery: GossipDiscovery,
302 ) -> (
303 Self,
304 mpsc::Sender<RpcMessage>,
305 mpsc::Sender<LocalActorMessage>,
306 ) {
307 let peer_id = endpoint.id();
308 let dialer = Dialer::new(endpoint.clone());
309 let state = proto::State::new(
310 peer_id,
311 Default::default(),
312 config,
313 rand::rngs::StdRng::from_rng(&mut rand::rng()),
314 );
315 let (rpc_tx, rpc_rx) = mpsc::channel(TO_ACTOR_CAP);
316 let (local_tx, local_rx) = mpsc::channel(16);
317 let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
318
319 let actor = Actor {
320 alpn: alpn.unwrap_or_else(|| GOSSIP_ALPN.to_vec().into()),
321 endpoint,
322 state,
323 dialer,
324 rpc_rx,
325 in_event_rx,
326 in_event_tx,
327 timers: Timers::new(),
328 command_rx: StreamGroup::new().keyed(),
329 peers: Default::default(),
330 topics: Default::default(),
331 quit_queue: Default::default(),
332 connection_tasks: Default::default(),
333 metrics,
334 local_rx,
335 topic_event_forwarders: Default::default(),
336 discovery,
337 };
338
339 (actor, rpc_tx, local_tx)
340 }
341
342 pub async fn run(mut self) {
343 let mut addr_update_stream = self.setup().await;
344
345 let mut i = 0;
346 while self.event_loop(&mut addr_update_stream, i).await {
347 i += 1;
348 }
349 }
350
351 async fn setup(&mut self) -> impl Stream<Item = EndpointAddr> + Send + Unpin + use<> {
356 let addr_update_stream = self.endpoint.watch_addr().stream();
357 let initial_addr = self.endpoint.addr();
358 self.handle_addr_update(initial_addr).await;
359 addr_update_stream
360 }
361
362 async fn event_loop(
366 &mut self,
367 addr_updates: &mut (impl Stream<Item = EndpointAddr> + Send + Unpin),
368 i: usize,
369 ) -> bool {
370 self.metrics.actor_tick_main.inc();
371 tokio::select! {
372 biased;
373 conn = self.local_rx.recv() => {
374 match conn {
375 Some(LocalActorMessage::Shutdown { reply }) => {
376 debug!("received shutdown message, quit all topics");
377 self.quit_queue.extend(self.topics.keys().copied());
378 self.process_quit_queue().await;
379 debug!("all topics quit, stop gossip actor");
380 reply.send(()).ok();
381 return false;
382 },
383 Some(LocalActorMessage::HandleConnection(conn)) => {
384 self.handle_connection(conn.remote_id(), ConnOrigin::Accept, conn);
385 }
386 None => {
387 debug!("all gossip handles dropped, stop gossip actor");
388 return false;
389 }
390 }
391 }
392 msg = self.rpc_rx.recv() => {
393 trace!(?i, "tick: to_actor_rx");
394 self.metrics.actor_tick_rx.inc();
395 match msg {
396 Some(msg) => {
397 self.handle_rpc_msg(msg, Instant::now()).await;
398 }
399 None => {
400 debug!("all gossip handles dropped, stop gossip actor");
401 return false;
402 }
403 }
404 },
405 Some((key, (topic, command))) = self.command_rx.next(), if !self.command_rx.is_empty() => {
406 trace!(?i, "tick: command_rx");
407 self.handle_command(topic, key, command).await;
408 },
409 Some(new_address) = addr_updates.next() => {
410 trace!(?i, "tick: new_address");
411 self.metrics.actor_tick_endpoint.inc();
412 self.handle_addr_update(new_address).await;
413 }
414 (peer_id, res) = self.dialer.next_conn() => {
415 trace!(?i, "tick: dialer");
416 self.metrics.actor_tick_dialer.inc();
417 match res {
418 Some(Ok(conn)) => {
419 debug!(peer = %peer_id.fmt_short(), "dial successful");
420 self.metrics.actor_tick_dialer_success.inc();
421 self.handle_connection(peer_id, ConnOrigin::Dial, conn);
422 }
423 Some(Err(err)) => {
424 warn!(peer = %peer_id.fmt_short(), "dial failed: {err}");
425 self.metrics.actor_tick_dialer_failure.inc();
426 let peer_state = self.peers.get(&peer_id);
427 let is_active = matches!(peer_state, Some(PeerState::Active { .. }));
428 if !is_active {
429 self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
430 .await;
431 }
432 }
433 None => {
434 warn!(peer = %peer_id.fmt_short(), "dial disconnected");
435 self.metrics.actor_tick_dialer_failure.inc();
436 }
437 }
438 }
439 event = self.in_event_rx.recv() => {
440 trace!(?i, "tick: in_event_rx");
441 self.metrics.actor_tick_in_event_rx.inc();
442 let event = event.expect("unreachable: in_event_tx is never dropped before receiver");
443 self.handle_in_event(event, Instant::now()).await;
444 }
445 _ = self.timers.wait_next() => {
446 trace!(?i, "tick: timers");
447 self.metrics.actor_tick_timers.inc();
448 let now = Instant::now();
449 while let Some((_instant, timer)) = self.timers.pop_before(now) {
450 self.handle_in_event(InEvent::TimerExpired(timer), now).await;
451 }
452 }
453 Some(res) = self.connection_tasks.join_next(), if !self.connection_tasks.is_empty() => {
454 trace!(?i, "tick: connection_tasks");
455 let (peer_id, conn, result) = res.expect("connection task panicked");
456 self.handle_connection_task_finished(peer_id, conn, result).await;
457 }
458 Some(res) = self.topic_event_forwarders.join_next(), if !self.topic_event_forwarders.is_empty() => {
459 let topic_id = res.expect("topic event forwarder panicked");
460 if let Some(state) = self.topics.get_mut(&topic_id) {
461 if !state.still_needed() {
462 self.quit_queue.push_back(topic_id);
463 self.process_quit_queue().await;
464 }
465 }
466 }
467 }
468
469 true
470 }
471
472 async fn handle_addr_update(&mut self, endpoint_addr: EndpointAddr) {
473 let peer_data = encode_peer_data(&endpoint_addr.into());
475 self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now())
476 .await
477 }
478
479 async fn handle_command(
480 &mut self,
481 topic: TopicId,
482 key: stream_group::Key,
483 command: Option<Command>,
484 ) {
485 debug!(?topic, ?key, ?command, "handle command");
486 let Some(state) = self.topics.get_mut(&topic) else {
487 warn!("received command for unknown topic");
489 return;
490 };
491 match command {
492 Some(command) => {
493 let command = match command {
494 Command::Broadcast(message) => ProtoCommand::Broadcast(message, Scope::Swarm),
495 Command::BroadcastNeighbors(message) => {
496 ProtoCommand::Broadcast(message, Scope::Neighbors)
497 }
498 Command::JoinPeers(peers) => ProtoCommand::Join(peers),
499 };
500 self.handle_in_event(proto::InEvent::Command(topic, command), Instant::now())
501 .await;
502 }
503 None => {
504 state.command_rx_keys.remove(&key);
505 if !state.still_needed() {
506 self.quit_queue.push_back(topic);
507 self.process_quit_queue().await;
508 }
509 }
510 }
511 }
512
513 fn handle_connection(&mut self, peer_id: EndpointId, origin: ConnOrigin, conn: Connection) {
514 let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
515 let conn_id = conn.stable_id();
516
517 let queue = match self.peers.entry(peer_id) {
518 Entry::Occupied(mut entry) => entry.get_mut().accept_conn(send_tx, conn_id),
519 Entry::Vacant(entry) => {
520 entry.insert(PeerState::Active {
521 active_send_tx: send_tx,
522 active_conn_id: conn_id,
523 other_conns: Vec::new(),
524 });
525 Vec::new()
526 }
527 };
528
529 let max_message_size = self.state.max_message_size();
530 let in_event_tx = self.in_event_tx.clone();
531
532 self.connection_tasks.spawn(
534 async move {
535 let res = connection_loop(
536 peer_id,
537 conn.clone(),
538 origin,
539 send_rx,
540 in_event_tx,
541 max_message_size,
542 queue,
543 )
544 .await;
545 (peer_id, conn, res)
546 }
547 .instrument(error_span!("conn", peer = %peer_id.fmt_short())),
548 );
549 }
550
551 #[tracing::instrument(name = "conn", skip_all, fields(peer = %peer_id.fmt_short()))]
552 async fn handle_connection_task_finished(
553 &mut self,
554 peer_id: EndpointId,
555 conn: Connection,
556 task_result: Result<(), ConnectionLoopError>,
557 ) {
558 if conn.close_reason().is_none() {
559 conn.close(0u32.into(), b"close from disconnect");
560 }
561 let reason = conn.close_reason().expect("just closed");
562 let error = task_result.err();
563 debug!(%reason, ?error, "connection closed");
564 if let Some(PeerState::Active {
565 active_conn_id,
566 other_conns,
567 ..
568 }) = self.peers.get_mut(&peer_id)
569 {
570 if conn.stable_id() == *active_conn_id {
571 debug!("active send connection closed, mark peer as disconnected");
572 self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now())
573 .await;
574 } else {
575 other_conns.retain(|x| *x != conn.stable_id());
576 debug!("remaining {} other connections", other_conns.len() + 1);
577 }
578 } else {
579 debug!("peer already marked as disconnected");
580 }
581 }
582
583 async fn handle_rpc_msg(&mut self, msg: RpcMessage, now: Instant) {
584 trace!("handle to_actor {msg:?}");
585 match msg {
586 RpcMessage::Join(msg) => {
587 let WithChannels {
588 inner,
589 rx,
590 tx,
591 span: _,
593 } = msg;
594 let api::JoinRequest {
595 topic_id,
596 bootstrap,
597 } = inner;
598 let TopicState {
599 neighbors,
600 event_sender,
601 command_rx_keys,
602 } = self.topics.entry(topic_id).or_default();
603 let mut sender_dead = false;
604 if !neighbors.is_empty() {
605 for neighbor in neighbors.iter() {
606 if let Err(_err) = tx.try_send(Event::NeighborUp(*neighbor)).await {
607 sender_dead = true;
608 break;
609 }
610 }
611 }
612
613 if !sender_dead {
614 let fut =
615 topic_subscriber_loop(tx, event_sender.subscribe()).map(move |_| topic_id);
616 self.topic_event_forwarders
617 .spawn(fut.instrument(tracing::Span::current()));
618 }
619 let command_rx = TopicCommandStream::new(topic_id, Box::pin(rx.into_stream()));
620 let key = self.command_rx.insert(command_rx);
621 command_rx_keys.insert(key);
622
623 self.handle_in_event(
624 InEvent::Command(
625 topic_id,
626 ProtoCommand::Join(bootstrap.into_iter().collect()),
627 ),
628 now,
629 )
630 .await;
631 }
632 }
633 }
634
635 async fn handle_in_event(&mut self, event: InEvent, now: Instant) {
636 self.handle_in_event_inner(event, now).await;
637 self.process_quit_queue().await;
638 }
639
640 async fn process_quit_queue(&mut self) {
641 while let Some(topic_id) = self.quit_queue.pop_front() {
642 self.handle_in_event_inner(
643 InEvent::Command(topic_id, ProtoCommand::Quit),
644 Instant::now(),
645 )
646 .await;
647 if self.topics.remove(&topic_id).is_some() {
648 tracing::debug!(%topic_id, "publishers and subscribers gone; unsubscribing");
649 }
650 }
651 }
652
653 async fn handle_in_event_inner(&mut self, event: InEvent, now: Instant) {
654 if matches!(event, InEvent::TimerExpired(_)) {
655 trace!(?event, "handle in_event");
656 } else {
657 debug!(?event, "handle in_event");
658 };
659 let out = self.state.handle(event, now, Some(&self.metrics));
660 for event in out {
661 if matches!(event, OutEvent::ScheduleTimer(_, _)) {
662 trace!(?event, "handle out_event");
663 } else {
664 debug!(?event, "handle out_event");
665 };
666 match event {
667 OutEvent::SendMessage(peer_id, message) => {
668 let state = self.peers.entry(peer_id).or_default();
669 match state {
670 PeerState::Active { active_send_tx, .. } => {
671 if let Err(_err) = active_send_tx.send(message).await {
672 warn!(
675 peer = %peer_id.fmt_short(),
676 "failed to send: connection task send loop terminated",
677 );
678 }
679 }
680 PeerState::Pending { queue } => {
681 if queue.is_empty() {
682 debug!(peer = %peer_id.fmt_short(), "start to dial");
683 self.dialer.queue_dial(peer_id, self.alpn.clone());
684 }
685 queue.push(message);
686 }
687 }
688 }
689 OutEvent::EmitEvent(topic_id, event) => {
690 let Some(state) = self.topics.get_mut(&topic_id) else {
691 warn!(?topic_id, "gossip state emitted event for unknown topic");
693 continue;
694 };
695 let TopicState {
696 neighbors,
697 event_sender,
698 ..
699 } = state;
700 match &event {
701 ProtoEvent::NeighborUp(neighbor) => {
702 neighbors.insert(*neighbor);
703 }
704 ProtoEvent::NeighborDown(neighbor) => {
705 neighbors.remove(neighbor);
706 }
707 _ => {}
708 }
709 event_sender.send(event).ok();
710 if !state.still_needed() {
711 self.quit_queue.push_back(topic_id);
712 }
713 }
714 OutEvent::ScheduleTimer(delay, timer) => {
715 self.timers.insert(now + delay, timer);
716 }
717 OutEvent::DisconnectPeer(peer_id) => {
718 debug!(peer=%peer_id.fmt_short(), "gossip state indicates disconnect: drop peer");
720 self.peers.remove(&peer_id);
721 }
722 OutEvent::PeerData(endpoint_id, data) => match decode_peer_data(&data) {
723 Err(err) => warn!("Failed to decode {data:?} from {endpoint_id}: {err}"),
724 Ok(info) => {
725 debug!(peer = ?endpoint_id, "add known addrs: {info:?}");
726 let mut endpoint_addr = EndpointAddr::new(endpoint_id);
727 for addr in info.direct_addresses {
728 endpoint_addr = endpoint_addr.with_ip_addr(addr);
729 }
730 if let Some(relay_url) = info.relay_url {
731 endpoint_addr = endpoint_addr.with_relay_url(relay_url);
732 }
733
734 self.discovery.add(endpoint_addr);
735 }
736 },
737 }
738 }
739 }
740}
741
742type ConnId = usize;
743
744#[derive(Debug)]
745enum PeerState {
746 Pending {
747 queue: Vec<ProtoMessage>,
748 },
749 Active {
750 active_send_tx: mpsc::Sender<ProtoMessage>,
751 active_conn_id: ConnId,
752 other_conns: Vec<ConnId>,
753 },
754}
755
756impl PeerState {
757 fn accept_conn(
758 &mut self,
759 send_tx: mpsc::Sender<ProtoMessage>,
760 conn_id: ConnId,
761 ) -> Vec<ProtoMessage> {
762 match self {
763 PeerState::Pending { queue } => {
764 let queue = std::mem::take(queue);
765 *self = PeerState::Active {
766 active_send_tx: send_tx,
767 active_conn_id: conn_id,
768 other_conns: Vec::new(),
769 };
770 queue
771 }
772 PeerState::Active {
773 active_send_tx,
774 active_conn_id,
775 other_conns,
776 } => {
777 other_conns.push(*active_conn_id);
783 *active_send_tx = send_tx;
784 *active_conn_id = conn_id;
785 Vec::new()
786 }
787 }
788 }
789}
790
791impl Default for PeerState {
792 fn default() -> Self {
793 PeerState::Pending { queue: Vec::new() }
794 }
795}
796
797#[derive(Debug)]
798struct TopicState {
799 neighbors: BTreeSet<EndpointId>,
800 event_sender: broadcast::Sender<ProtoEvent>,
801 command_rx_keys: HashSet<stream_group::Key>,
805}
806
807impl Default for TopicState {
808 fn default() -> Self {
809 let (event_sender, _) = broadcast::channel(TOPIC_EVENT_CAP);
810 Self {
811 neighbors: Default::default(),
812 command_rx_keys: Default::default(),
813 event_sender,
814 }
815 }
816}
817
818impl TopicState {
819 fn still_needed(&self) -> bool {
821 !self.command_rx_keys.is_empty() || self.event_sender.receiver_count() > 0
824 }
825
826 #[cfg(test)]
827 fn joined(&self) -> bool {
828 !self.neighbors.is_empty()
829 }
830}
831
832#[derive(Debug, Clone, Copy, PartialEq, Eq)]
834enum ConnOrigin {
835 Accept,
836 Dial,
837}
838
839#[allow(missing_docs)]
840#[stack_error(derive, add_meta, from_sources, std_sources)]
841#[non_exhaustive]
842enum ConnectionLoopError {
843 #[error(transparent)]
844 Write {
845 source: self::util::WriteError,
846 },
847 #[error(transparent)]
848 Read {
849 source: self::util::ReadError,
850 },
851 #[error(transparent)]
852 Connection {
853 #[error(std_err)]
854 source: iroh::endpoint::ConnectionError,
855 },
856 ActorDropped {},
857}
858
859impl<T> From<mpsc::error::SendError<T>> for ConnectionLoopError {
860 fn from(_value: mpsc::error::SendError<T>) -> Self {
861 e!(ConnectionLoopError::ActorDropped)
862 }
863}
864
865async fn connection_loop(
866 from: PublicKey,
867 conn: Connection,
868 origin: ConnOrigin,
869 send_rx: mpsc::Receiver<ProtoMessage>,
870 in_event_tx: mpsc::Sender<InEvent>,
871 max_message_size: usize,
872 queue: Vec<ProtoMessage>,
873) -> Result<(), ConnectionLoopError> {
874 debug!(?origin, "connection established");
875
876 let mut send_loop = SendLoop::new(conn.clone(), send_rx, max_message_size);
877 let mut recv_loop = RecvLoop::new(from, conn, in_event_tx, max_message_size);
878
879 let send_fut = send_loop.run(queue).instrument(error_span!("send"));
880 let recv_fut = recv_loop.run().instrument(error_span!("recv"));
881
882 let (send_res, recv_res) = tokio::join!(send_fut, recv_fut);
883 send_res?;
884 recv_res?;
885 Ok(())
886}
887
888#[derive(Default, Debug, Clone, Serialize, Deserialize)]
889struct AddrInfo {
890 relay_url: Option<RelayUrl>,
891 direct_addresses: BTreeSet<SocketAddr>,
892}
893
894impl From<EndpointAddr> for AddrInfo {
895 fn from(endpoint_addr: EndpointAddr) -> Self {
896 Self {
897 relay_url: endpoint_addr.relay_urls().next().cloned(),
898 direct_addresses: endpoint_addr.ip_addrs().cloned().collect(),
899 }
900 }
901}
902
903fn encode_peer_data(info: &AddrInfo) -> PeerData {
904 let bytes = postcard::to_stdvec(info).expect("serializing AddrInfo may not fail");
905 PeerData::new(bytes)
906}
907
908fn decode_peer_data(peer_data: &PeerData) -> Result<AddrInfo, postcard::Error> {
909 let bytes = peer_data.as_bytes();
910 if bytes.is_empty() {
911 return Ok(AddrInfo::default());
912 }
913 let info = postcard::from_bytes(bytes)?;
914 Ok(info)
915}
916
917async fn topic_subscriber_loop(
918 sender: irpc::channel::mpsc::Sender<Event>,
919 mut topic_events: broadcast::Receiver<ProtoEvent>,
920) {
921 loop {
922 tokio::select! {
923 biased;
924 msg = topic_events.recv() => {
925 let event = match msg {
926 Err(broadcast::error::RecvError::Closed) => break,
927 Err(broadcast::error::RecvError::Lagged(_)) => Event::Lagged,
928 Ok(event) => event.into(),
929 };
930 if sender.send(event).await.is_err() {
931 break;
932 }
933 }
934 _ = sender.closed() => break,
935 }
936 }
937}
938
939type BoxedCommandReceiver =
941 n0_future::stream::Boxed<Result<Command, irpc::channel::mpsc::RecvError>>;
942
943#[derive(derive_more::Debug)]
944struct TopicCommandStream {
945 topic_id: TopicId,
946 #[debug("CommandStream")]
947 stream: BoxedCommandReceiver,
948 closed: bool,
949}
950
951impl TopicCommandStream {
952 fn new(topic_id: TopicId, stream: BoxedCommandReceiver) -> Self {
953 Self {
954 topic_id,
955 stream,
956 closed: false,
957 }
958 }
959}
960
961impl Stream for TopicCommandStream {
962 type Item = (TopicId, Option<Command>);
963 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
964 if self.closed {
965 return Poll::Ready(None);
966 }
967 match Pin::new(&mut self.stream).poll_next(cx) {
968 Poll::Ready(Some(Ok(item))) => Poll::Ready(Some((self.topic_id, Some(item)))),
969 Poll::Ready(None) | Poll::Ready(Some(Err(_))) => {
970 self.closed = true;
971 Poll::Ready(Some((self.topic_id, None)))
972 }
973 Poll::Pending => Poll::Pending,
974 }
975 }
976}
977
978#[derive(Debug)]
979struct Dialer {
980 endpoint: Endpoint,
981 pending: JoinSet<(
982 EndpointId,
983 Option<Result<Connection, iroh::endpoint::ConnectError>>,
984 )>,
985 pending_dials: HashMap<EndpointId, CancellationToken>,
986}
987
988impl Dialer {
989 fn new(endpoint: Endpoint) -> Self {
991 Self {
992 endpoint,
993 pending: Default::default(),
994 pending_dials: Default::default(),
995 }
996 }
997
998 fn queue_dial(&mut self, endpoint_id: EndpointId, alpn: Bytes) {
1000 if self.is_pending(endpoint_id) {
1001 return;
1002 }
1003 let cancel = CancellationToken::new();
1004 self.pending_dials.insert(endpoint_id, cancel.clone());
1005 let endpoint = self.endpoint.clone();
1006 self.pending.spawn(
1007 async move {
1008 let res = tokio::select! {
1009 biased;
1010 _ = cancel.cancelled() => None,
1011 res = endpoint.connect(endpoint_id, &alpn) => Some(res),
1012 };
1013 (endpoint_id, res)
1014 }
1015 .instrument(tracing::Span::current()),
1016 );
1017 }
1018
1019 fn is_pending(&self, endpoint: EndpointId) -> bool {
1021 self.pending_dials.contains_key(&endpoint)
1022 }
1023
1024 async fn next_conn(
1027 &mut self,
1028 ) -> (
1029 EndpointId,
1030 Option<Result<Connection, iroh::endpoint::ConnectError>>,
1031 ) {
1032 match self.pending_dials.is_empty() {
1033 false => {
1034 let (endpoint_id, res) = loop {
1035 match self.pending.join_next().await {
1036 Some(Ok((endpoint_id, res))) => {
1037 self.pending_dials.remove(&endpoint_id);
1038 break (endpoint_id, res);
1039 }
1040 Some(Err(e)) => {
1041 error!("next conn error: {:?}", e);
1042 }
1043 None => {
1044 error!("no more pending conns available");
1045 std::future::pending().await
1046 }
1047 }
1048 };
1049
1050 (endpoint_id, res)
1051 }
1052 true => std::future::pending().await,
1053 }
1054 }
1055}
1056
1057#[cfg(test)]
1058pub(crate) mod test {
1059 use std::{future::Future, time::Duration};
1060
1061 use bytes::Bytes;
1062 use futures_concurrency::future::TryJoin;
1063 use iroh::{
1064 discovery::static_provider::StaticProvider, endpoint::BindError, protocol::Router,
1065 RelayMap, RelayMode, SecretKey,
1066 };
1067 use n0_error::{AnyError, Result, StdResultExt};
1068 use n0_tracing_test::traced_test;
1069 use rand::{CryptoRng, Rng};
1070 use tokio::{spawn, time::timeout};
1071 use tokio_util::sync::CancellationToken;
1072 use tracing::{info, instrument};
1073
1074 use super::*;
1075 use crate::api::{ApiError, GossipReceiver, GossipSender};
1076
1077 struct ManualActorLoop {
1078 actor: Actor,
1079 step: usize,
1080 }
1081
1082 impl std::ops::Deref for ManualActorLoop {
1083 type Target = Actor;
1084
1085 fn deref(&self) -> &Self::Target {
1086 &self.actor
1087 }
1088 }
1089
1090 impl std::ops::DerefMut for ManualActorLoop {
1091 fn deref_mut(&mut self) -> &mut Self::Target {
1092 &mut self.actor
1093 }
1094 }
1095
1096 type EndpointHandle = tokio::task::JoinHandle<Result<()>>;
1097
1098 impl ManualActorLoop {
1099 #[instrument(skip_all, fields(me = %actor.endpoint.id().fmt_short()))]
1100 async fn new(mut actor: Actor) -> Self {
1101 let _ = actor.setup().await;
1102 Self { actor, step: 0 }
1103 }
1104
1105 #[instrument(skip_all, fields(me = %self.endpoint.id().fmt_short()))]
1106 async fn step(&mut self) -> bool {
1107 let ManualActorLoop { actor, step } = self;
1108 *step += 1;
1109 let addr_update_stream = &mut futures_lite::stream::pending();
1112 actor.event_loop(addr_update_stream, *step).await
1113 }
1114
1115 async fn steps(&mut self, n: usize) {
1116 for _ in 0..n {
1117 self.step().await;
1118 }
1119 }
1120
1121 async fn finish(mut self) {
1122 while self.step().await {}
1123 }
1124 }
1125
1126 impl Gossip {
1127 async fn t_new_with_actor(
1134 rng: &mut rand_chacha::ChaCha12Rng,
1135 config: proto::Config,
1136 relay_map: RelayMap,
1137 cancel: &CancellationToken,
1138 ) -> Result<(Self, Actor, EndpointHandle), BindError> {
1139 let endpoint = create_endpoint(rng, relay_map, None).await?;
1140 let metrics = Arc::new(Metrics::default());
1141 let discovery = GossipDiscovery::default();
1142 endpoint.discovery().add(discovery.clone());
1143
1144 let (actor, to_actor_tx, conn_tx) =
1145 Actor::new(endpoint, config, metrics.clone(), None, discovery);
1146 let max_message_size = actor.state.max_message_size();
1147
1148 let _actor_handle =
1149 AbortOnDropHandle::new(task::spawn(futures_lite::future::pending()));
1150 let gossip = Self {
1151 inner: Inner {
1152 api: GossipApi::local(to_actor_tx),
1153 local_tx: conn_tx,
1154 _actor_handle,
1155 max_message_size,
1156 metrics,
1157 }
1158 .into(),
1159 };
1160
1161 let endpoint_task = task::spawn(endpoint_loop(
1162 actor.endpoint.clone(),
1163 gossip.clone(),
1164 cancel.child_token(),
1165 ));
1166
1167 Ok((gossip, actor, endpoint_task))
1168 }
1169
1170 async fn t_new(
1172 rng: &mut rand_chacha::ChaCha12Rng,
1173 config: proto::Config,
1174 relay_map: RelayMap,
1175 cancel: &CancellationToken,
1176 ) -> Result<(Self, Endpoint, EndpointHandle, impl Drop + use<>), BindError> {
1177 let (g, actor, ep_handle) =
1178 Gossip::t_new_with_actor(rng, config, relay_map, cancel).await?;
1179 let ep = actor.endpoint.clone();
1180 let me = ep.id().fmt_short();
1181 let actor_handle =
1182 task::spawn(actor.run().instrument(tracing::error_span!("gossip", %me)));
1183 Ok((g, ep, ep_handle, AbortOnDropHandle::new(actor_handle)))
1184 }
1185 }
1186
1187 pub(crate) async fn create_endpoint(
1188 rng: &mut rand_chacha::ChaCha12Rng,
1189 relay_map: RelayMap,
1190 static_provider: Option<StaticProvider>,
1191 ) -> Result<Endpoint, BindError> {
1192 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1193 .secret_key(SecretKey::generate(rng))
1194 .alpns(vec![GOSSIP_ALPN.to_vec()])
1195 .insecure_skip_relay_cert_verify(true)
1196 .bind()
1197 .await?;
1198
1199 if let Some(static_provider) = static_provider {
1200 ep.discovery().add(static_provider);
1201 }
1202 ep.online().await;
1203 Ok(ep)
1204 }
1205
1206 async fn endpoint_loop(
1207 endpoint: Endpoint,
1208 gossip: Gossip,
1209 cancel: CancellationToken,
1210 ) -> Result<()> {
1211 loop {
1212 tokio::select! {
1213 biased;
1214 _ = cancel.cancelled() => break,
1215 incoming = endpoint.accept() => match incoming {
1216 None => break,
1217 Some(incoming) => {
1218 let connecting = match incoming.accept() {
1219 Ok(connecting) => connecting,
1220 Err(err) => {
1221 warn!("incoming connection failed: {err:#}");
1222 continue;
1225 }
1226 };
1227 let connection = connecting
1228 .await
1229 .std_context("await incoming connection")?;
1230 gossip.handle_connection(connection).await?
1231 }
1232 }
1233 }
1234 }
1235 Ok(())
1236 }
1237
1238 #[tokio::test]
1239 #[traced_test]
1240 async fn gossip_net_smoke() {
1241 let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(1);
1242 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1243
1244 let static_provider = StaticProvider::new();
1245
1246 let ep1 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1247 .await
1248 .unwrap();
1249 let ep2 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1250 .await
1251 .unwrap();
1252 let ep3 = create_endpoint(&mut rng, relay_map.clone(), Some(static_provider.clone()))
1253 .await
1254 .unwrap();
1255
1256 let go1 = Gossip::builder().spawn(ep1.clone());
1257 let go2 = Gossip::builder().spawn(ep2.clone());
1258 let go3 = Gossip::builder().spawn(ep3.clone());
1259 debug!("peer1 {:?}", ep1.id());
1260 debug!("peer2 {:?}", ep2.id());
1261 debug!("peer3 {:?}", ep3.id());
1262 let pi1 = ep1.id();
1263 let pi2 = ep2.id();
1264
1265 let cancel = CancellationToken::new();
1266 let tasks = [
1267 spawn(endpoint_loop(ep1.clone(), go1.clone(), cancel.clone())),
1268 spawn(endpoint_loop(ep2.clone(), go2.clone(), cancel.clone())),
1269 spawn(endpoint_loop(ep3.clone(), go3.clone(), cancel.clone())),
1270 ];
1271
1272 debug!("----- adding peers ----- ");
1273 let topic: TopicId = blake3::hash(b"foobar").into();
1274
1275 let addr1 = EndpointAddr::new(pi1).with_relay_url(relay_url.clone());
1276 let addr2 = EndpointAddr::new(pi2).with_relay_url(relay_url);
1277 static_provider.add_endpoint_info(addr1.clone());
1278 static_provider.add_endpoint_info(addr2.clone());
1279
1280 debug!("----- joining ----- ");
1281 let [sub1, mut sub2, mut sub3] = [
1283 go1.subscribe_and_join(topic, vec![]),
1284 go2.subscribe_and_join(topic, vec![pi1]),
1285 go3.subscribe_and_join(topic, vec![pi2]),
1286 ]
1287 .try_join()
1288 .await
1289 .unwrap();
1290
1291 let (sink1, _stream1) = sub1.split();
1292
1293 let len = 2;
1294
1295 let pub1 = spawn(async move {
1297 for i in 0..len {
1298 let message = format!("hi{i}");
1299 info!("go1 broadcast: {message:?}");
1300 sink1.broadcast(message.into_bytes().into()).await.unwrap();
1301 tokio::time::sleep(Duration::from_micros(1)).await;
1302 }
1303 });
1304
1305 let sub2 = spawn(async move {
1307 let mut recv = vec![];
1308 loop {
1309 let ev = sub2.next().await.unwrap().unwrap();
1310 info!("go2 event: {ev:?}");
1311 if let Event::Received(msg) = ev {
1312 recv.push(msg.content);
1313 }
1314 if recv.len() == len {
1315 return recv;
1316 }
1317 }
1318 });
1319
1320 let sub3 = spawn(async move {
1322 let mut recv = vec![];
1323 loop {
1324 let ev = sub3.next().await.unwrap().unwrap();
1325 info!("go3 event: {ev:?}");
1326 if let Event::Received(msg) = ev {
1327 recv.push(msg.content);
1328 }
1329 if recv.len() == len {
1330 return recv;
1331 }
1332 }
1333 });
1334
1335 timeout(Duration::from_secs(10), pub1)
1336 .await
1337 .unwrap()
1338 .unwrap();
1339 let recv2 = timeout(Duration::from_secs(10), sub2)
1340 .await
1341 .unwrap()
1342 .unwrap();
1343 let recv3 = timeout(Duration::from_secs(10), sub3)
1344 .await
1345 .unwrap()
1346 .unwrap();
1347
1348 let expected: Vec<Bytes> = (0..len)
1349 .map(|i| Bytes::from(format!("hi{i}").into_bytes()))
1350 .collect();
1351 assert_eq!(recv2, expected);
1352 assert_eq!(recv3, expected);
1353
1354 cancel.cancel();
1355 for t in tasks {
1356 timeout(Duration::from_secs(10), t)
1357 .await
1358 .unwrap()
1359 .unwrap()
1360 .unwrap();
1361 }
1362 }
1363
1364 #[tokio::test]
1374 #[traced_test]
1375 async fn subscription_cleanup() -> Result {
1376 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1377 let ct = CancellationToken::new();
1378 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1379
1380 let (go1, actor, ep1_handle) =
1382 Gossip::t_new_with_actor(rng, Default::default(), relay_map.clone(), &ct).await?;
1383 let mut actor = ManualActorLoop::new(actor).await;
1384
1385 let (go2, ep2, ep2_handle, _test_actor_handle) =
1387 Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1388
1389 let endpoint_id1 = actor.endpoint.id();
1390 let endpoint_id2 = ep2.id();
1391 tracing::info!(
1392 endpoint_1 = %endpoint_id1.fmt_short(),
1393 endpoint_2 = %endpoint_id2.fmt_short(),
1394 "endpoints ready"
1395 );
1396
1397 let topic: TopicId = blake3::hash(b"subscription_cleanup").into();
1398 tracing::info!(%topic, "joining");
1399
1400 let ct2 = ct.clone();
1407 let go2_task = async move {
1408 let (_pub_tx, mut sub_rx) = go2.subscribe_and_join(topic, vec![]).await?.split();
1409
1410 let subscribe_fut = async {
1411 while let Some(ev) = sub_rx.try_next().await? {
1412 match ev {
1413 Event::Lagged => tracing::debug!("missed some messages :("),
1414 Event::Received(_) => unreachable!("test does not send messages"),
1415 other => tracing::debug!(?other, "gs event"),
1416 }
1417 }
1418
1419 tracing::debug!("subscribe stream ended");
1420 Ok::<_, AnyError>(())
1421 };
1422
1423 tokio::select! {
1424 _ = ct2.cancelled() => Ok(()),
1425 res = subscribe_fut => res,
1426 }
1427 }
1428 .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1429 let go2_handle = task::spawn(go2_task);
1430
1431 let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1433 let static_provider = StaticProvider::new();
1434 static_provider.add_endpoint_info(addr2);
1435 actor.endpoint.discovery().add(static_provider);
1436 let (tx, mut rx) = mpsc::channel::<()>(1);
1438 let ct1 = ct.clone();
1439 let go1_task = async move {
1440 tracing::info!("subscribing the first time");
1442 let sub_1a = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1443
1444 rx.recv().await.expect("signal for second subscribe");
1446 tracing::info!("subscribing a second time");
1447 let sub_1b = go1.subscribe_and_join(topic, vec![endpoint_id2]).await?;
1448 drop(sub_1a);
1449
1450 rx.recv().await.expect("signal for second subscribe");
1452 tracing::info!("dropping all handles");
1453 drop(sub_1b);
1454
1455 ct1.cancelled().await;
1457 drop(go1);
1458
1459 Ok::<_, AnyError>(())
1460 }
1461 .instrument(tracing::debug_span!("endpoint_1", %endpoint_id1));
1462 let go1_handle = task::spawn(go1_task);
1463
1464 actor.steps(3).await; let state = actor.topics.get(&topic).expect("get registered topic");
1469 assert!(state.joined());
1470
1471 tx.send(())
1473 .await
1474 .std_context("signal additional subscribe")?;
1475 actor.steps(3).await; let state = actor.topics.get(&topic).expect("get registered topic");
1477 assert!(state.joined());
1478
1479 tx.send(()).await.std_context("signal drop handles")?;
1481 actor.steps(2).await; assert!(!actor.topics.contains_key(&topic));
1483
1484 ct.cancel();
1486 let wait = Duration::from_secs(2);
1487 timeout(wait, ep1_handle)
1488 .await
1489 .std_context("wait endpoint1 task")?
1490 .std_context("join endpoint1 task")??;
1491 timeout(wait, ep2_handle)
1492 .await
1493 .std_context("wait endpoint2 task")?
1494 .std_context("join endpoint2 task")??;
1495 timeout(wait, go1_handle)
1496 .await
1497 .std_context("wait gossip1 task")?
1498 .std_context("join gossip1 task")??;
1499 timeout(wait, go2_handle)
1500 .await
1501 .std_context("wait gossip2 task")?
1502 .std_context("join gossip2 task")??;
1503 timeout(wait, actor.finish())
1504 .await
1505 .std_context("wait actor finish")?;
1506
1507 Ok(())
1508 }
1509
1510 #[tokio::test]
1517 #[traced_test]
1518 async fn can_reconnect() -> Result {
1519 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1520 let ct = CancellationToken::new();
1521 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1522
1523 let (go1, ep1, ep1_handle, _test_actor_handle1) =
1524 Gossip::t_new(rng, Default::default(), relay_map.clone(), &ct).await?;
1525
1526 let (go2, ep2, ep2_handle, _test_actor_handle2) =
1527 Gossip::t_new(rng, Default::default(), relay_map, &ct).await?;
1528
1529 let endpoint_id1 = ep1.id();
1530 let endpoint_id2 = ep2.id();
1531 tracing::info!(
1532 endpoint_1 = %endpoint_id1.fmt_short(),
1533 endpoint_2 = %endpoint_id2.fmt_short(),
1534 "endpoints ready"
1535 );
1536
1537 let topic: TopicId = blake3::hash(b"can_reconnect").into();
1538 tracing::info!(%topic, "joining");
1539
1540 let ct2 = ct.child_token();
1541 let (tx, mut rx) = mpsc::channel::<()>(1);
1543 let addr1 = EndpointAddr::new(endpoint_id1).with_relay_url(relay_url.clone());
1544 let static_provider = StaticProvider::new();
1545 static_provider.add_endpoint_info(addr1);
1546 ep2.discovery().add(static_provider.clone());
1547 let go2_task = async move {
1548 let mut sub = go2.subscribe(topic, Vec::new()).await?;
1549 sub.joined().await?;
1550
1551 rx.recv().await.expect("signal to unsubscribe");
1552 tracing::info!("unsubscribing");
1553 drop(sub);
1554
1555 rx.recv().await.expect("signal to subscribe again");
1556 tracing::info!("resubscribing");
1557 let mut sub = go2.subscribe(topic, vec![endpoint_id1]).await?;
1558
1559 sub.joined().await?;
1560 tracing::info!("subscription successful!");
1561
1562 ct2.cancelled().await;
1563
1564 Ok::<_, ApiError>(())
1565 }
1566 .instrument(tracing::debug_span!("endpoint_2", %endpoint_id2));
1567 let go2_handle = task::spawn(go2_task);
1568
1569 let addr2 = EndpointAddr::new(endpoint_id2).with_relay_url(relay_url);
1570 static_provider.add_endpoint_info(addr2);
1571 ep1.discovery().add(static_provider);
1572
1573 let mut sub = go1.subscribe(topic, vec![endpoint_id2]).await?;
1574 sub.joined().await?;
1576
1577 tx.send(()).await.std_context("signal unsubscribe")?;
1579
1580 let conn_timeout = Duration::from_millis(500);
1582 let ev = timeout(conn_timeout, sub.try_next())
1583 .await
1584 .std_context("wait neighbor down")??;
1585 assert_eq!(ev, Some(Event::NeighborDown(endpoint_id2)));
1586 tracing::info!("endpoint 2 left");
1587
1588 tx.send(()).await.std_context("signal resubscribe")?;
1590
1591 let conn_timeout = Duration::from_millis(500);
1592 let ev = timeout(conn_timeout, sub.try_next())
1593 .await
1594 .std_context("wait neighbor up")??;
1595 assert_eq!(ev, Some(Event::NeighborUp(endpoint_id2)));
1596 tracing::info!("endpoint 2 rejoined!");
1597
1598 ct.cancel();
1600 let wait = Duration::from_secs(2);
1601 timeout(wait, ep1_handle)
1602 .await
1603 .std_context("wait endpoint1 task")?
1604 .std_context("join endpoint1 task")??;
1605 timeout(wait, ep2_handle)
1606 .await
1607 .std_context("wait endpoint2 task")?
1608 .std_context("join endpoint2 task")??;
1609 timeout(wait, go2_handle)
1610 .await
1611 .std_context("wait gossip2 task")?
1612 .std_context("join gossip2 task")??;
1613
1614 Result::Ok(())
1615 }
1616
1617 #[tokio::test]
1618 #[traced_test]
1619 async fn can_die_and_reconnect() -> Result {
1620 fn run_in_thread<T: Send + 'static>(
1623 cancel: CancellationToken,
1624 fut: impl std::future::Future<Output = T> + Send + 'static,
1625 ) -> std::thread::JoinHandle<Option<T>> {
1626 std::thread::spawn(move || {
1627 let rt = tokio::runtime::Builder::new_current_thread()
1628 .enable_all()
1629 .build()
1630 .unwrap();
1631 rt.block_on(async move { cancel.run_until_cancelled(fut).await })
1632 })
1633 }
1634
1635 async fn spawn_gossip(
1637 secret_key: SecretKey,
1638 relay_map: RelayMap,
1639 ) -> Result<(Router, Gossip), BindError> {
1640 let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
1641 .secret_key(secret_key)
1642 .insecure_skip_relay_cert_verify(true)
1643 .bind()
1644 .await?;
1645 let gossip = Gossip::builder().spawn(ep.clone());
1646 let router = Router::builder(ep)
1647 .accept(GOSSIP_ALPN, gossip.clone())
1648 .spawn();
1649 Ok((router, gossip))
1650 }
1651
1652 async fn broadcast_once(
1654 secret_key: SecretKey,
1655 relay_map: RelayMap,
1656 bootstrap_addr: EndpointAddr,
1657 topic_id: TopicId,
1658 message: String,
1659 ) -> Result {
1660 let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1661 info!(endpoint_id = %router.endpoint().id().fmt_short(), "broadcast endpoint spawned");
1662 let bootstrap = vec![bootstrap_addr.id];
1663 let static_provider = StaticProvider::new();
1664 static_provider.add_endpoint_info(bootstrap_addr);
1665 router.endpoint().discovery().add(static_provider);
1666 let mut topic = gossip.subscribe_and_join(topic_id, bootstrap).await?;
1667 topic.broadcast(message.as_bytes().to_vec().into()).await?;
1668 std::future::pending::<()>().await;
1669 Ok(())
1670 }
1671
1672 let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1673 let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1674 let topic_id = TopicId::from_bytes(rng.random());
1675
1676 let (addr_tx, addr_rx) = tokio::sync::oneshot::channel();
1679 let (msgs_recv_tx, mut msgs_recv_rx) = tokio::sync::mpsc::channel(3);
1680 let recv_task = tokio::task::spawn({
1681 let relay_map = relay_map.clone();
1682 let secret_key = SecretKey::generate(&mut rng);
1683 async move {
1684 let (router, gossip) = spawn_gossip(secret_key, relay_map).await?;
1685 router.endpoint().online().await;
1690 let addr = router.endpoint().addr();
1691 info!(endpoint_id = %addr.id.fmt_short(), "recv endpoint spawned");
1692 addr_tx.send(addr).unwrap();
1693 let mut topic = gossip.subscribe_and_join(topic_id, vec![]).await?;
1694 while let Some(event) = topic.try_next().await.unwrap() {
1695 if let Event::Received(message) = event {
1696 let message = std::str::from_utf8(&message.content)
1697 .std_context("decode broadcast message")?
1698 .to_string();
1699 msgs_recv_tx
1700 .send(message)
1701 .await
1702 .std_context("forward received message")?;
1703 }
1704 }
1705 Ok::<_, AnyError>(())
1706 }
1707 });
1708
1709 let endpoint0_addr = addr_rx.await.std_context("receive endpoint address")?;
1710 let max_wait = Duration::from_secs(5);
1711
1712 let cancel = CancellationToken::new();
1715 let secret = SecretKey::generate(&mut rng);
1716 let join_handle_1 = run_in_thread(
1717 cancel.clone(),
1718 broadcast_once(
1719 secret.clone(),
1720 relay_map.clone(),
1721 endpoint0_addr.clone(),
1722 topic_id,
1723 "msg1".to_string(),
1724 ),
1725 );
1726 let msg = timeout(max_wait, msgs_recv_rx.recv())
1728 .await
1729 .std_context("wait for first broadcast")?
1730 .std_context("receiver dropped channel")?;
1731 assert_eq!(&msg, "msg1");
1732 info!("kill broadcast endpoint");
1733 cancel.cancel();
1734
1735 let cancel = CancellationToken::new();
1737 let join_handle_2 = run_in_thread(
1738 cancel.clone(),
1739 broadcast_once(
1740 secret.clone(),
1741 relay_map.clone(),
1742 endpoint0_addr.clone(),
1743 topic_id,
1744 "msg2".to_string(),
1745 ),
1746 );
1747 let msg = timeout(max_wait, msgs_recv_rx.recv())
1750 .await
1751 .std_context("wait for second broadcast")?
1752 .std_context("receiver dropped channel")?;
1753 assert_eq!(&msg, "msg2");
1754 info!("kill broadcast endpoint");
1755 cancel.cancel();
1756
1757 info!("kill recv endpoint");
1758 recv_task.abort();
1759 assert!(join_handle_1.join().unwrap().is_none());
1760 assert!(join_handle_2.join().unwrap().is_none());
1761
1762 Ok(())
1763 }
1764
1765 #[tokio::test]
1766 #[traced_test]
1767 async fn gossip_change_alpn() -> n0_error::Result<()> {
1768 let alpn = b"my-gossip-alpn";
1769 let topic_id = TopicId::from([0u8; 32]);
1770
1771 let ep1 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1772 let ep2 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1773 let gossip1 = Gossip::builder().alpn(alpn).spawn(ep1.clone());
1774 let gossip2 = Gossip::builder().alpn(alpn).spawn(ep2.clone());
1775 let router1 = Router::builder(ep1).accept(alpn, gossip1.clone()).spawn();
1776 let router2 = Router::builder(ep2).accept(alpn, gossip2.clone()).spawn();
1777
1778 let addr1 = router1.endpoint().addr();
1779 let id1 = addr1.id;
1780 let static_provider = StaticProvider::new();
1781 static_provider.add_endpoint_info(addr1);
1782 router2.endpoint().discovery().add(static_provider);
1783
1784 let mut topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1785 let mut topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1786
1787 timeout(Duration::from_secs(3), topic1.joined())
1788 .await
1789 .std_context("wait topic1 join")??;
1790 timeout(Duration::from_secs(3), topic2.joined())
1791 .await
1792 .std_context("wait topic2 join")??;
1793 router1.shutdown().await.std_context("shutdown router1")?;
1794 router2.shutdown().await.std_context("shutdown router2")?;
1795 Ok(())
1796 }
1797
1798 #[tokio::test]
1799 #[traced_test]
1800 async fn gossip_rely_on_gossip_discovery() -> n0_error::Result<()> {
1801 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
1802
1803 async fn spawn(
1804 rng: &mut impl CryptoRng,
1805 ) -> n0_error::Result<(EndpointId, Router, Gossip, GossipSender, GossipReceiver)> {
1806 let topic_id = TopicId::from([0u8; 32]);
1807 let ep = Endpoint::empty_builder(RelayMode::Disabled)
1808 .secret_key(SecretKey::generate(rng))
1809 .bind()
1810 .await?;
1811 let endpoint_id = ep.id();
1812 let gossip = Gossip::builder().spawn(ep.clone());
1813 let router = Router::builder(ep)
1814 .accept(GOSSIP_ALPN, gossip.clone())
1815 .spawn();
1816 let topic = gossip.subscribe(topic_id, vec![]).await?;
1817 let (sender, receiver) = topic.split();
1818 Ok((endpoint_id, router, gossip, sender, receiver))
1819 }
1820
1821 let (n1, r1, _g1, _tx1, mut rx1) = spawn(rng).await?;
1823 let (n2, r2, _g2, tx2, mut rx2) = spawn(rng).await?;
1824 let (n3, r3, _g3, tx3, mut rx3) = spawn(rng).await?;
1825
1826 println!("endpoints {:?}", [n1, n2, n3]);
1827
1828 let addr1 = r1.endpoint().addr();
1830 let disco = StaticProvider::new();
1831 disco.add_endpoint_info(addr1);
1832
1833 r2.endpoint().discovery().add(disco.clone());
1835 tx2.join_peers(vec![n1]).await?;
1836
1837 timeout(Duration::from_secs(3), rx1.joined())
1839 .await
1840 .std_context("wait rx1 join")??;
1841 timeout(Duration::from_secs(3), rx2.joined())
1842 .await
1843 .std_context("wait rx2 join")??;
1844
1845 r3.endpoint().discovery().add(disco.clone());
1847 tx3.join_peers(vec![n1]).await?;
1848
1849 let ev = timeout(Duration::from_secs(3), rx3.next())
1852 .await
1853 .std_context("wait rx3 first neighbor")?;
1854 assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1855 let ev = timeout(Duration::from_secs(3), rx3.next())
1856 .await
1857 .std_context("wait rx3 second neighbor")?;
1858 assert!(matches!(ev, Some(Ok(Event::NeighborUp(_)))));
1859
1860 assert_eq!(sorted(rx3.neighbors()), sorted([n1, n2]));
1861
1862 let ev = timeout(Duration::from_secs(3), rx2.next())
1863 .await
1864 .std_context("wait rx2 neighbor")?;
1865 assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1866
1867 let ev = timeout(Duration::from_secs(3), rx1.next())
1868 .await
1869 .std_context("wait rx1 neighbor")?;
1870 assert!(matches!(ev, Some(Ok(Event::NeighborUp(n))) if n == n3));
1871
1872 tokio::try_join!(r1.shutdown(), r2.shutdown(), r3.shutdown())
1873 .std_context("shutdown routers")?;
1874 Ok(())
1875 }
1876
1877 fn sorted<T: Ord>(input: impl IntoIterator<Item = T>) -> Vec<T> {
1878 let mut out: Vec<_> = input.into_iter().collect();
1879 out.sort();
1880 out
1881 }
1882
1883 #[tokio::test]
1889 #[traced_test]
1890 async fn topic_stays_alive_after_sender_drop() -> n0_error::Result<()> {
1891 let topic_id = TopicId::from([99u8; 32]);
1892
1893 let ep1 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1894 let ep2 = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
1895 let gossip1 = Gossip::builder().spawn(ep1.clone());
1896 let gossip2 = Gossip::builder().spawn(ep2.clone());
1897 let router1 = Router::builder(ep1)
1898 .accept(crate::ALPN, gossip1.clone())
1899 .spawn();
1900 let router2 = Router::builder(ep2)
1901 .accept(crate::ALPN, gossip2.clone())
1902 .spawn();
1903
1904 let addr1 = router1.endpoint().addr();
1905 let id1 = addr1.id;
1906 let static_provider = StaticProvider::new();
1907 static_provider.add_endpoint_info(addr1);
1908 router2.endpoint().discovery().add(static_provider);
1909
1910 let topic1 = gossip1.subscribe(topic_id, vec![]).await?;
1911 let topic2 = gossip2.subscribe(topic_id, vec![id1]).await?;
1912
1913 let (tx1, mut rx1) = topic1.split();
1914 let (tx2, mut rx2) = topic2.split();
1915
1916 timeout(Duration::from_secs(3), rx1.joined())
1918 .await
1919 .std_context("wait rx1 join")??;
1920 timeout(Duration::from_secs(3), rx2.joined())
1921 .await
1922 .std_context("wait rx2 join")??;
1923
1924 drop(tx1);
1926
1927 tx2.broadcast(b"hello from node2".to_vec().into()).await?;
1929
1930 let event = timeout(Duration::from_secs(3), rx1.next())
1932 .await
1933 .std_context("wait for message on rx1")?;
1934
1935 match event {
1936 Some(Ok(Event::Received(msg))) => {
1937 assert_eq!(&msg.content[..], b"hello from node2");
1938 }
1939 other => panic!("expected Received event, got {:?}", other),
1940 }
1941
1942 drop(tx2);
1943 drop(rx1);
1944 drop(rx2);
1945 router1.shutdown().await.std_context("shutdown router1")?;
1946 router2.shutdown().await.std_context("shutdown router2")?;
1947 Ok(())
1948 }
1949
1950 #[tokio::test]
1959 #[traced_test]
1960 async fn peer_reconnect_after_offline() -> Result {
1961 let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(42);
1962 let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap();
1963 let topic_id = TopicId::from([42u8; 32]);
1964 let disco = StaticProvider::new();
1965
1966 let secret_key_a = SecretKey::generate(rng);
1967 let secret_key_b = SecretKey::generate(rng);
1968
1969 let ep_a = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1971 .secret_key(secret_key_a.clone())
1972 .insecure_skip_relay_cert_verify(true)
1973 .discovery(disco.clone())
1974 .bind()
1975 .await?;
1976 disco.set_endpoint_info(EndpointAddr::new(ep_a.id()).with_relay_url(relay_url.clone()));
1977 let mut gossip_a = Gossip::builder().spawn(ep_a.clone());
1978 let mut router_a = Router::builder(ep_a)
1979 .accept(GOSSIP_ALPN, gossip_a.clone())
1980 .spawn();
1981 router_a.endpoint().online().await;
1982 info!("Peer A started: {}", router_a.endpoint().id().fmt_short());
1983
1984 let ep_b = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1986 .secret_key(secret_key_b.clone())
1987 .insecure_skip_relay_cert_verify(true)
1988 .discovery(disco.clone())
1989 .bind()
1990 .await?;
1991 disco.set_endpoint_info(EndpointAddr::new(ep_b.id()).with_relay_url(relay_url.clone()));
1992 let gossip_b = Gossip::builder().spawn(ep_b.clone());
1993 let router_b = Router::builder(ep_b)
1994 .accept(GOSSIP_ALPN, gossip_b.clone())
1995 .spawn();
1996 router_b.endpoint().online().await;
1997 info!("Peer B started: {}", router_b.endpoint().id().fmt_short());
1998
1999 let id_b = router_b.endpoint().id();
2000 let id_a = router_a.endpoint().id();
2001
2002 let topic_a = gossip_a.subscribe(topic_id, vec![]).await?;
2004 let (_tx_a, mut rx_a) = topic_a.split();
2005
2006 let topic_b = gossip_b.subscribe(topic_id, vec![id_a]).await?;
2008 let (tx_b, mut rx_b) = topic_b.split();
2009
2010 timeout_2s(rx_a.joined()).await??;
2012 timeout_2s(rx_b.joined()).await??;
2013 info!("Both peers joined");
2014
2015 tx_b.broadcast(b"hello from B".to_vec().into()).await?;
2017 match timeout_2s(rx_a.next()).await? {
2018 Some(Ok(Event::Received(msg))) => assert_eq!(&msg.content[..], b"hello from B"),
2019 other => panic!("expected Received event, got {:?}", other),
2020 }
2021 info!("Initial communication successful");
2022
2023 for i in 0..10 {
2024 async {
2025 info!("Shutting down Peer A to simulate going offline");
2026 router_a.endpoint().close().await;
2029
2030 info!("Waiting for Peer B to detect Peer A going offline");
2032 match timeout_2s(rx_b.next()).await? {
2033 Some(Ok(Event::NeighborDown(peer))) if peer == id_a => {}
2034 other => panic!("Peer B received unexpected event: {other:?}"),
2035 }
2036
2037 info!("Restarting Peer A");
2039 let ep_a = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
2040 .secret_key(secret_key_a.clone())
2041 .discovery(disco.clone())
2042 .insecure_skip_relay_cert_verify(true)
2043 .bind()
2044 .await?;
2045 gossip_a = Gossip::builder().spawn(ep_a.clone());
2046 router_a = Router::builder(ep_a)
2047 .accept(GOSSIP_ALPN, gossip_a.clone())
2048 .spawn();
2049 router_a.endpoint().online().await;
2050 info!("Peer A restarted: {}", router_a.endpoint().id().fmt_short());
2051
2052 let topic_a = gossip_a.subscribe(topic_id, vec![id_b]).await?;
2054 let (_tx_a, mut rx_a) = topic_a.split();
2055
2056 info!("Waiting for Peer A to rejoin");
2058 timeout_2s(rx_a.joined()).await??;
2059 info!("Peer A successfully rejoined");
2060
2061 match timeout_2s(rx_b.next()).await? {
2063 Some(Ok(Event::NeighborUp(peer))) if peer == id_a => {}
2064 other => panic!("Peer B received unexpected event: {other:?}"),
2065 }
2066
2067 tx_b.broadcast(format!("hello from B after reconnect {i}").into())
2069 .await?;
2070 let event = match timeout_2s(rx_a.next()).await? {
2071 Some(Ok(Event::Received(msg))) => msg,
2072 other => panic!("Peer A received unexpected event {other:?}"),
2073 };
2074 assert_eq!(
2075 &event.content[..],
2076 format!("hello from B after reconnect {i}").as_bytes()
2077 );
2078 info!("Communication restored after reconnection");
2079 n0_error::Ok(())
2080 }
2081 .instrument(error_span!("round", %i))
2082 .await?;
2083 }
2084
2085 router_a
2086 .shutdown()
2087 .await
2088 .std_context("shutdown router_a2")?;
2089 router_b.shutdown().await.std_context("shutdown router_b")?;
2090
2091 Ok(())
2092 }
2093
2094 async fn timeout_2s<T>(fut: impl Future<Output = T>) -> Result<T> {
2095 tokio::time::timeout(Duration::from_secs(2), fut)
2096 .await
2097 .std_context("timeout")
2098 }
2099}