#![allow(missing_docs)]
use std::{
collections::{HashMap, HashSet},
time::SystemTime,
};
use anyhow::{Context, Result};
use futures_lite::FutureExt;
use iroh::{key::PublicKey, Endpoint, NodeAddr, NodeId};
use iroh_blobs::{
downloader::{DownloadError, DownloadRequest, Downloader},
get::Stats,
store::EntryStatus,
Hash, HashAndFormat,
};
use iroh_gossip::net::Gossip;
use iroh_metrics::inc;
use serde::{Deserialize, Serialize};
use tokio::{
sync::{self, mpsc, oneshot},
task::JoinSet,
};
use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
use super::state::{NamespaceStates, Origin, SyncReason};
use crate::{
actor::{OpenOpts, SyncHandle},
engine::gossip::GossipState,
metrics::Metrics,
net::{
connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
SyncFinished,
},
AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
};
const SOURCE_NAME: &str = "docs_engine";
#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
pub enum Op {
Put(SignedEntry),
ContentReady(Hash),
SyncReport(SyncReport),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncReport {
namespace: NamespaceId,
heads: Vec<u8>,
}
#[derive(derive_more::Debug, strum::Display)]
pub enum ToLiveActor {
StartSync {
namespace: NamespaceId,
peers: Vec<NodeAddr>,
#[debug("onsehot::Sender")]
reply: sync::oneshot::Sender<anyhow::Result<()>>,
},
Leave {
namespace: NamespaceId,
kill_subscribers: bool,
#[debug("onsehot::Sender")]
reply: sync::oneshot::Sender<anyhow::Result<()>>,
},
Shutdown {
reply: sync::oneshot::Sender<()>,
},
Subscribe {
namespace: NamespaceId,
#[debug("sender")]
sender: async_channel::Sender<Event>,
#[debug("oneshot::Sender")]
reply: sync::oneshot::Sender<Result<()>>,
},
HandleConnection {
conn: iroh::endpoint::Connecting,
},
AcceptSyncRequest {
namespace: NamespaceId,
peer: PublicKey,
#[debug("oneshot::Sender")]
reply: sync::oneshot::Sender<AcceptOutcome>,
},
IncomingSyncReport {
from: PublicKey,
report: SyncReport,
},
NeighborContentReady {
namespace: NamespaceId,
node: PublicKey,
hash: Hash,
},
NeighborUp {
namespace: NamespaceId,
peer: PublicKey,
},
NeighborDown {
namespace: NamespaceId,
peer: PublicKey,
},
}
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
pub enum Event {
ContentReady {
hash: Hash,
},
NeighborUp(PublicKey),
NeighborDown(PublicKey),
SyncFinished(SyncEvent),
PendingContentReady,
}
type SyncConnectRes = (
NamespaceId,
PublicKey,
SyncReason,
Result<SyncFinished, ConnectError>,
);
type SyncAcceptRes = Result<SyncFinished, AcceptError>;
type DownloadRes = (NamespaceId, Hash, Result<Stats, DownloadError>);
pub struct LiveActor<B: iroh_blobs::store::Store> {
inbox: mpsc::Receiver<ToLiveActor>,
sync: SyncHandle,
endpoint: Endpoint,
bao_store: B,
downloader: Downloader,
replica_events_tx: async_channel::Sender<crate::Event>,
replica_events_rx: async_channel::Receiver<crate::Event>,
sync_actor_tx: mpsc::Sender<ToLiveActor>,
gossip: GossipState,
running_sync_connect: JoinSet<SyncConnectRes>,
running_sync_accept: JoinSet<SyncAcceptRes>,
download_tasks: JoinSet<DownloadRes>,
missing_hashes: HashSet<Hash>,
queued_hashes: QueuedHashes,
subscribers: SubscribersMap,
state: NamespaceStates,
}
impl<B: iroh_blobs::store::Store> LiveActor<B> {
#[allow(clippy::too_many_arguments)]
pub fn new(
sync: SyncHandle,
endpoint: Endpoint,
gossip: Gossip,
bao_store: B,
downloader: Downloader,
inbox: mpsc::Receiver<ToLiveActor>,
sync_actor_tx: mpsc::Sender<ToLiveActor>,
) -> Self {
let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
Self {
inbox,
sync,
replica_events_rx,
replica_events_tx,
endpoint,
gossip: gossip_state,
bao_store,
downloader,
sync_actor_tx,
running_sync_connect: Default::default(),
running_sync_accept: Default::default(),
subscribers: Default::default(),
download_tasks: Default::default(),
state: Default::default(),
missing_hashes: Default::default(),
queued_hashes: Default::default(),
}
}
pub async fn run(mut self) -> Result<()> {
let shutdown_reply = self.run_inner().await;
if let Err(err) = self.shutdown().await {
error!(?err, "Error during shutdown");
}
drop(self);
match shutdown_reply {
Ok(reply) => {
reply.send(()).ok();
Ok(())
}
Err(err) => Err(err),
}
}
async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
let mut i = 0;
loop {
i += 1;
trace!(?i, "tick wait");
inc!(Metrics, doc_live_tick_main);
tokio::select! {
biased;
msg = self.inbox.recv() => {
let msg = msg.context("to_actor closed")?;
trace!(?i, %msg, "tick: to_actor");
inc!(Metrics, doc_live_tick_actor);
match msg {
ToLiveActor::Shutdown { reply } => {
break Ok(reply);
}
msg => {
self.on_actor_message(msg).await.context("on_actor_message")?;
}
}
}
event = self.replica_events_rx.recv() => {
trace!(?i, "tick: replica_event");
inc!(Metrics, doc_live_tick_replica_event);
let event = event.context("replica_events closed")?;
if let Err(err) = self.on_replica_event(event).await {
error!(?err, "Failed to process replica event");
}
}
Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
trace!(?i, "tick: running_sync_connect");
inc!(Metrics, doc_live_tick_running_sync_connect);
let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
}
Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
trace!(?i, "tick: running_sync_accept");
inc!(Metrics, doc_live_tick_running_sync_accept);
let res = res.context("running_sync_accept closed")?;
self.on_sync_via_accept_finished(res).await;
}
Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
trace!(?i, "tick: pending_downloads");
inc!(Metrics, doc_live_tick_pending_downloads);
let (namespace, hash, res) = res.context("pending_downloads closed")?;
self.on_download_ready(namespace, hash, res).await;
}
res = self.gossip.progress(), if !self.gossip.is_empty() => {
if let Err(error) = res {
warn!(?error, "gossip state failed");
}
}
}
}
}
async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
match msg {
ToLiveActor::Shutdown { .. } => {
unreachable!("handled in run");
}
ToLiveActor::IncomingSyncReport { from, report } => {
self.on_sync_report(from, report).await
}
ToLiveActor::NeighborUp { namespace, peer } => {
debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
self.subscribers
.send(&namespace, Event::NeighborUp(peer))
.await;
}
ToLiveActor::NeighborDown { namespace, peer } => {
debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
self.subscribers
.send(&namespace, Event::NeighborDown(peer))
.await;
}
ToLiveActor::StartSync {
namespace,
peers,
reply,
} => {
let res = self.start_sync(namespace, peers).await;
reply.send(res).ok();
}
ToLiveActor::Leave {
namespace,
kill_subscribers,
reply,
} => {
let res = self.leave(namespace, kill_subscribers).await;
reply.send(res).ok();
}
ToLiveActor::Subscribe {
namespace,
sender,
reply,
} => {
self.subscribers.subscribe(namespace, sender);
reply.send(Ok(())).ok();
}
ToLiveActor::HandleConnection { conn } => {
self.handle_connection(conn).await;
}
ToLiveActor::AcceptSyncRequest {
namespace,
peer,
reply,
} => {
let outcome = self.accept_sync_request(namespace, peer);
reply.send(outcome).ok();
}
ToLiveActor::NeighborContentReady {
namespace,
node,
hash,
} => {
self.on_neighbor_content_ready(namespace, node, hash).await;
}
};
Ok(true)
}
#[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
if !self.state.start_connect(&namespace, peer, reason) {
return;
}
let endpoint = self.endpoint.clone();
let sync = self.sync.clone();
let fut = async move {
let res = connect_and_sync(&endpoint, &sync, namespace, NodeAddr::new(peer)).await;
(namespace, peer, reason, res)
}
.instrument(Span::current());
self.running_sync_connect.spawn(fut);
}
async fn shutdown(&mut self) -> anyhow::Result<()> {
self.subscribers.clear();
let (gossip_shutdown_res, _store) = tokio::join!(
self.gossip.shutdown(),
self.sync.shutdown()
);
gossip_shutdown_res?;
Ok(())
}
async fn start_sync(&mut self, namespace: NamespaceId, mut peers: Vec<NodeAddr>) -> Result<()> {
debug!(?namespace, peers = peers.len(), "start sync");
if !self.state.is_syncing(&namespace) {
let opts = OpenOpts::default()
.sync()
.subscribe(self.replica_events_tx.clone());
self.sync.open(namespace, opts).await?;
self.state.insert(namespace);
}
match self.sync.get_sync_peers(namespace).await {
Ok(None) => {
}
Ok(Some(known_useful_peers)) => {
let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
match PublicKey::from_bytes(&peer_id_bytes) {
Ok(public_key) => Some(NodeAddr::new(public_key)),
Err(_signing_error) => {
warn!("potential db corruption: peers per doc can't be decoded");
None
}
}
});
peers.extend(as_node_addr);
}
Err(e) => {
warn!(%e, "db error reading peers per document")
}
}
self.join_peers(namespace, peers).await?;
Ok(())
}
async fn leave(
&mut self,
namespace: NamespaceId,
kill_subscribers: bool,
) -> anyhow::Result<()> {
if self.state.remove(&namespace) {
self.sync.set_sync(namespace, false).await?;
self.sync
.unsubscribe(namespace, self.replica_events_tx.clone())
.await?;
self.sync.close(namespace).await?;
self.gossip.quit(&namespace);
}
if kill_subscribers {
self.subscribers.remove(&namespace);
}
Ok(())
}
async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<NodeAddr>) -> Result<()> {
let mut peer_ids = Vec::new();
for peer in peers.into_iter() {
let peer_id = peer.node_id;
if peer.info.is_empty() {
peer_ids.push(peer_id)
} else {
match self.endpoint.add_node_addr_with_source(peer, SOURCE_NAME) {
Ok(()) => {
peer_ids.push(peer_id);
}
Err(err) => {
warn!(peer = %peer_id.fmt_short(), "failed to add known addrs: {err:?}");
}
}
}
}
self.gossip.join(namespace, peer_ids.clone()).await?;
if !peer_ids.is_empty() {
for peer in peer_ids {
self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
}
}
Ok(())
}
#[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
async fn on_sync_via_connect_finished(
&mut self,
namespace: NamespaceId,
peer: PublicKey,
reason: SyncReason,
result: Result<SyncFinished, ConnectError>,
) {
match result {
Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
debug!(?reason, "remote abort, already syncing");
}
res => {
self.on_sync_finished(
namespace,
peer,
Origin::Connect(reason),
res.map_err(Into::into),
)
.await
}
}
}
#[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
match res {
Ok(state) => {
self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
.await
}
Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
debug!(?reason, "aborted by us");
}
Err(err) => {
if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
self.on_sync_finished(
namespace,
peer,
Origin::Accept,
Err(anyhow::Error::from(err)),
)
.await;
} else {
debug!(?err, "failed before reading the first message");
}
}
}
}
async fn on_sync_finished(
&mut self,
namespace: NamespaceId,
peer: PublicKey,
origin: Origin,
result: Result<SyncFinished>,
) {
match &result {
Err(ref err) => {
warn!(?origin, ?err, "sync failed");
}
Ok(ref details) => {
info!(
sent = %details.outcome.num_sent,
recv = %details.outcome.num_recv,
t_connect = ?details.timings.connect,
t_process = ?details.timings.process,
"sync finished",
);
if let Err(e) = self
.sync
.register_useful_peer(namespace, *peer.as_bytes())
.await
{
debug!(%e, "failed to register peer for document")
}
if details.outcome.num_recv > 0 {
info!("broadcast sync report to neighbors");
match details
.outcome
.heads_received
.encode(Some(self.gossip.max_message_size()))
{
Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
Ok(heads) => {
let report = SyncReport { namespace, heads };
self.broadcast_neighbors(namespace, &Op::SyncReport(report))
.await;
}
}
}
}
};
let result_for_event = match &result {
Ok(details) => Ok(details.into()),
Err(err) => Err(err.to_string()),
};
let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
return;
};
let ev = SyncEvent {
peer,
origin,
result: result_for_event,
finished: SystemTime::now(),
started,
};
self.subscribers
.send(&namespace, Event::SyncFinished(ev))
.await;
if self.queued_hashes.contains_namespace(&namespace) {
self.state.set_may_emit_ready(&namespace, true);
} else {
self.subscribers
.send(&namespace, Event::PendingContentReady)
.await;
self.state.set_may_emit_ready(&namespace, false);
}
if resync {
self.sync_with_peer(namespace, peer, SyncReason::Resync);
}
}
async fn broadcast_neighbors(&self, namespace: NamespaceId, op: &Op) {
if !self.state.is_syncing(&namespace) {
return;
}
let msg = match postcard::to_stdvec(op) {
Ok(msg) => msg,
Err(err) => {
error!(?err, ?op, "Failed to serialize message:");
return;
}
};
self.gossip
.broadcast_neighbors(&namespace, msg.into())
.await;
}
async fn on_download_ready(
&mut self,
namespace: NamespaceId,
hash: Hash,
res: Result<Stats, DownloadError>,
) {
let completed_namespaces = self.queued_hashes.remove_hash(&hash);
debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
if res.is_ok() {
self.subscribers
.send(&namespace, Event::ContentReady { hash })
.await;
self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
.await;
} else {
self.missing_hashes.insert(hash);
}
for namespace in completed_namespaces.iter() {
if let Some(true) = self.state.may_emit_ready(namespace) {
self.subscribers
.send(namespace, Event::PendingContentReady)
.await;
}
}
}
async fn on_neighbor_content_ready(
&mut self,
namespace: NamespaceId,
node: NodeId,
hash: Hash,
) {
self.start_download(namespace, hash, node, true).await;
}
#[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
let namespace = report.namespace;
if !self.state.is_syncing(&namespace) {
return;
}
let heads = match AuthorHeads::decode(&report.heads) {
Ok(heads) => heads,
Err(err) => {
warn!(?err, "failed to decode AuthorHeads");
return;
}
};
match self.sync.has_news_for_us(report.namespace, heads).await {
Ok(Some(updated_authors)) => {
info!(%updated_authors, "news reported: sync now");
self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
}
Ok(None) => {
debug!("no news reported: nothing to do");
}
Err(err) => {
warn!("sync actor error: {err:?}");
}
}
}
async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
match event {
crate::Event::LocalInsert { namespace, entry } => {
debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
if self.state.is_syncing(&namespace) {
let op = Op::Put(entry.clone());
let message = postcard::to_stdvec(&op)?.into();
self.gossip.broadcast(&namespace, message).await;
}
}
crate::Event::RemoteInsert {
namespace,
entry,
from,
should_download,
remote_content_status,
} => {
debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
if should_download {
let hash = entry.content_hash();
if matches!(remote_content_status, ContentStatus::Complete) {
let node_id = PublicKey::from_bytes(&from)?;
self.start_download(namespace, hash, node_id, false).await;
} else {
self.missing_hashes.insert(hash);
}
}
}
}
Ok(())
}
async fn start_download(
&mut self,
namespace: NamespaceId,
hash: Hash,
node: PublicKey,
only_if_missing: bool,
) {
let entry_status = self.bao_store.entry_status(&hash).await;
if matches!(entry_status, Ok(EntryStatus::Complete)) {
self.missing_hashes.remove(&hash);
return;
}
if self.queued_hashes.contains_hash(&hash) {
self.queued_hashes.insert(hash, namespace);
self.downloader.nodes_have(hash, vec![node]).await;
} else if !only_if_missing || self.missing_hashes.contains(&hash) {
let req = DownloadRequest::new(HashAndFormat::raw(hash), vec![node]);
let handle = self.downloader.queue(req).await;
self.queued_hashes.insert(hash, namespace);
self.missing_hashes.remove(&hash);
self.download_tasks
.spawn(async move { (namespace, hash, handle.await) });
}
}
#[instrument("accept", skip_all)]
pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connecting) {
let to_actor_tx = self.sync_actor_tx.clone();
let accept_request_cb = move |namespace, peer| {
let to_actor_tx = to_actor_tx.clone();
async move {
let (reply_tx, reply_rx) = oneshot::channel();
to_actor_tx
.send(ToLiveActor::AcceptSyncRequest {
namespace,
peer,
reply: reply_tx,
})
.await
.ok();
match reply_rx.await {
Ok(outcome) => outcome,
Err(err) => {
warn!(
"accept request callback failed to retrieve reply from actor: {err:?}"
);
AcceptOutcome::Reject(AbortReason::InternalServerError)
}
}
}
.boxed()
};
debug!("incoming connection");
let sync = self.sync.clone();
self.running_sync_accept.spawn(
async move { handle_connection(sync, conn, accept_request_cb).await }
.instrument(Span::current()),
);
}
pub fn accept_sync_request(
&mut self,
namespace: NamespaceId,
peer: PublicKey,
) -> AcceptOutcome {
self.state
.accept_request(&self.endpoint.node_id(), &namespace, peer)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct SyncEvent {
pub peer: PublicKey,
pub origin: Origin,
pub finished: SystemTime,
pub started: SystemTime,
pub result: std::result::Result<SyncDetails, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct SyncDetails {
pub entries_received: usize,
pub entries_sent: usize,
}
impl From<&SyncFinished> for SyncDetails {
fn from(value: &SyncFinished) -> Self {
Self {
entries_received: value.outcome.num_recv,
entries_sent: value.outcome.num_sent,
}
}
}
#[derive(Debug, Default)]
struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
impl SubscribersMap {
fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
self.0.entry(namespace).or_default().subscribe(sender);
}
async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
debug!(namespace=%namespace.fmt_short(), %event, "emit event");
let Some(subscribers) = self.0.get_mut(namespace) else {
return false;
};
if !subscribers.send(event).await {
self.0.remove(namespace);
}
true
}
fn remove(&mut self, namespace: &NamespaceId) {
self.0.remove(namespace);
}
fn clear(&mut self) {
self.0.clear();
}
}
#[derive(Debug, Default)]
struct QueuedHashes {
by_hash: HashMap<Hash, HashSet<NamespaceId>>,
by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
}
impl QueuedHashes {
fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
self.by_hash.entry(hash).or_default().insert(namespace);
self.by_namespace.entry(namespace).or_default().insert(hash);
}
fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
let namespaces = self.by_hash.remove(hash).unwrap_or_default();
let mut removed_namespaces = vec![];
for namespace in namespaces {
if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
hashes.remove(hash);
if hashes.is_empty() {
self.by_namespace.remove(&namespace);
removed_namespaces.push(namespace);
}
}
}
removed_namespaces
}
fn contains_hash(&self, hash: &Hash) -> bool {
self.by_hash.contains_key(hash)
}
fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
self.by_namespace.contains_key(namespace)
}
}
#[derive(Debug, Default)]
struct Subscribers(Vec<async_channel::Sender<Event>>);
impl Subscribers {
fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
self.0.push(sender)
}
async fn send(&mut self, event: Event) -> bool {
let futs = self.0.iter().map(|sender| sender.send(event.clone()));
let res = futures_buffered::join_all(futs).await;
for (i, res) in res.into_iter().enumerate().rev() {
if res.is_err() {
self.0.remove(i);
}
}
!self.0.is_empty()
}
}
fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
match res {
Ok(res) => res.peer.fmt_short(),
Err(err) => err
.peer()
.map(|x| x.fmt_short())
.unwrap_or_else(|| "unknown".to_string()),
}
}
fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
match res {
Ok(res) => res.namespace.fmt_short(),
Err(err) => err
.namespace()
.map(|x| x.fmt_short())
.unwrap_or_else(|| "unknown".to_string()),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_sync_remove() {
let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
let (a_tx, a_rx) = async_channel::unbounded();
let (b_tx, b_rx) = async_channel::unbounded();
let mut subscribers = Subscribers::default();
subscribers.subscribe(a_tx);
subscribers.subscribe(b_tx);
drop(a_rx);
drop(b_rx);
subscribers.send(Event::NeighborUp(pk)).await;
}
}