iroh_gossip/
cli.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
//! Define the gossiping subcommands.

use std::str::FromStr as _;

use anyhow::{Context, Result};
use clap::{ArgGroup, Subcommand};
use futures_lite::StreamExt;
use futures_util::SinkExt;
use iroh::NodeId;
use tokio::io::AsyncBufReadExt;

use crate::rpc::client::SubscribeOpts;

/// Commands to manage gossiping.
#[derive(Subcommand, Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum GossipCommands {
    /// Subscribe to a gossip topic
    #[command(
        long_about = r#"Subscribe to a gossip topic

Example usage:

    $ <cmd> gossip subscribe --topic test --start

This will print the current node's id. Open another terminal
or another machine and you can join the same topic:

    # on another machine/terminal
    $ <cmd> gossip subscribe --topic test <other node_id> --start

Any lines entered in stdin will be sent to the given topic
and received messages will be printed to stdout line-by-line.

The process waits for Ctrl+C to exit."#,
        group(
            ArgGroup::new("input")
                .required(true)
                .args(&["topic", "raw_topic"])
        )
    )]
    Subscribe {
        /// The topic to subscribe to.
        ///
        /// This will be hashed with BLAKE3 to get the actual topic ID.
        #[clap(long)]
        topic: Option<String>,
        /// The raw topic to subscribe to as hex. Needs to be 32 bytes, i.e. 64 hex characters.
        #[clap(long)]
        raw_topic: Option<String>,
        /// The set of nodes that are also part of the gossip swarm to bootstrap with.
        ///
        /// If empty, this will bootstrap a new swarm. Running the command will print
        /// the node's `NodeId`, which can be used as the bootstrap argument in other nodes.
        bootstrap: Vec<String>,
        /// If enabled, all gossip events will be printed, including neighbor up/down events.
        #[clap(long, short)]
        verbose: bool,
    },
}

impl GossipCommands {
    /// Runs the gossip command given the iroh client.
    pub async fn run(self, gossip: &crate::rpc::client::Client) -> Result<()> {
        match self {
            Self::Subscribe {
                topic,
                raw_topic,
                bootstrap,
                verbose,
            } => {
                let bootstrap = bootstrap
                    .into_iter()
                    .map(|node_id| NodeId::from_str(&node_id).map_err(|e| {
                        anyhow::anyhow!("Failed to parse bootstrap node id \"{node_id}\": {e}\nMust be a valid base32-encoded iroh node id.")
                    }))
                    .collect::<Result<_, _>>()?;

                let topic = match (topic, raw_topic) {
                    (Some(topic), None) => blake3::hash(topic.as_bytes()).into(),
                    (None, Some(raw_topic)) => {
                        let mut slice = [0; 32];
                        hex::decode_to_slice(raw_topic, &mut slice)
                            .context("failed to decode raw topic")?;
                        slice.into()
                    }
                    _ => anyhow::bail!("either topic or raw_topic must be provided"),
                };

                let opts = SubscribeOpts {
                    bootstrap,
                    subscription_capacity: 1024,
                };

                let (mut sink, mut stream) = gossip.subscribe_with_opts(topic, opts).await?;
                let mut input_lines = tokio::io::BufReader::new(tokio::io::stdin()).lines();
                loop {
                    tokio::select! {
                        line = input_lines.next_line() => {
                            let line = line.context("failed to read from stdin")?;
                            if let Some(line) = line {
                                sink.send(crate::net::Command::Broadcast(line.into())).await?;
                            } else {
                                break;
                            }
                        }
                        res = stream.next() => {
                            let res = res.context("gossip stream ended")?.context("failed to read gossip stream")?;
                            match res {
                                crate::net::Event::Gossip(event) => {
                                    if verbose {
                                        println!("{:?}", event);
                                    } else if let crate::net::GossipEvent::Received(crate::net::Message { content, .. }) = event {
                                        println!("{:?}", content);
                                    }
                                }
                                crate::net::Event::Lagged => {
                                    anyhow::bail!("gossip stream lagged");
                                }
                            };
                        }
                    }
                }
            }
        }
        Ok(())
    }
}