iroh_gossip/cli.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
//! Define the gossiping subcommands.
use std::str::FromStr as _;
use anyhow::{Context, Result};
use clap::{ArgGroup, Subcommand};
use futures_lite::StreamExt;
use futures_util::SinkExt;
use iroh::NodeId;
use tokio::io::AsyncBufReadExt;
use crate::rpc::client::SubscribeOpts;
/// Commands to manage gossiping.
#[derive(Subcommand, Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum GossipCommands {
/// Subscribe to a gossip topic
#[command(
long_about = r#"Subscribe to a gossip topic
Example usage:
$ <cmd> gossip subscribe --topic test --start
This will print the current node's id. Open another terminal
or another machine and you can join the same topic:
# on another machine/terminal
$ <cmd> gossip subscribe --topic test <other node_id> --start
Any lines entered in stdin will be sent to the given topic
and received messages will be printed to stdout line-by-line.
The process waits for Ctrl+C to exit."#,
group(
ArgGroup::new("input")
.required(true)
.args(&["topic", "raw_topic"])
)
)]
Subscribe {
/// The topic to subscribe to.
///
/// This will be hashed with BLAKE3 to get the actual topic ID.
#[clap(long)]
topic: Option<String>,
/// The raw topic to subscribe to as hex. Needs to be 32 bytes, i.e. 64 hex characters.
#[clap(long)]
raw_topic: Option<String>,
/// The set of nodes that are also part of the gossip swarm to bootstrap with.
///
/// If empty, this will bootstrap a new swarm. Running the command will print
/// the node's `NodeId`, which can be used as the bootstrap argument in other nodes.
bootstrap: Vec<String>,
/// If enabled, all gossip events will be printed, including neighbor up/down events.
#[clap(long, short)]
verbose: bool,
},
}
impl GossipCommands {
/// Runs the gossip command given the iroh client.
pub async fn run(self, gossip: &crate::rpc::client::Client) -> Result<()> {
match self {
Self::Subscribe {
topic,
raw_topic,
bootstrap,
verbose,
} => {
let bootstrap = bootstrap
.into_iter()
.map(|node_id| NodeId::from_str(&node_id).map_err(|e| {
anyhow::anyhow!("Failed to parse bootstrap node id \"{node_id}\": {e}\nMust be a valid base32-encoded iroh node id.")
}))
.collect::<Result<_, _>>()?;
let topic = match (topic, raw_topic) {
(Some(topic), None) => blake3::hash(topic.as_bytes()).into(),
(None, Some(raw_topic)) => {
let mut slice = [0; 32];
hex::decode_to_slice(raw_topic, &mut slice)
.context("failed to decode raw topic")?;
slice.into()
}
_ => anyhow::bail!("either topic or raw_topic must be provided"),
};
let opts = SubscribeOpts {
bootstrap,
subscription_capacity: 1024,
};
let (mut sink, mut stream) = gossip.subscribe_with_opts(topic, opts).await?;
let mut input_lines = tokio::io::BufReader::new(tokio::io::stdin()).lines();
loop {
tokio::select! {
line = input_lines.next_line() => {
let line = line.context("failed to read from stdin")?;
if let Some(line) = line {
sink.send(crate::net::Command::Broadcast(line.into())).await?;
} else {
break;
}
}
res = stream.next() => {
let res = res.context("gossip stream ended")?.context("failed to read gossip stream")?;
match res {
crate::net::Event::Gossip(event) => {
if verbose {
println!("{:?}", event);
} else if let crate::net::GossipEvent::Received(crate::net::Message { content, .. }) = event {
println!("{:?}", content);
}
}
crate::net::Event::Lagged => {
anyhow::bail!("gossip stream lagged");
}
};
}
}
}
}
}
Ok(())
}
}