1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
//! Invariants for the service.

#![cfg(any(test, debug_assertions))]

use super::*;

/// invariants for the service.
impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
    /// Checks the various invariants the service must maintain
    #[track_caller]
    pub(in crate::downloader) fn check_invariants(&self) {
        self.check_active_request_count();
        self.check_queued_requests_consistency();
        self.check_idle_peer_consistency();
        self.check_concurrency_limits();
        self.check_provider_map_prunning();
    }

    /// Checks concurrency limits are maintained.
    #[track_caller]
    fn check_concurrency_limits(&self) {
        let ConcurrencyLimits {
            max_concurrent_requests,
            max_concurrent_requests_per_node,
            max_open_connections,
            max_concurrent_dials_per_hash,
        } = &self.concurrency_limits;

        // check the total number of active requests to ensure it stays within the limit
        assert!(
            self.in_progress_downloads.len() <= *max_concurrent_requests,
            "max_concurrent_requests exceeded"
        );

        // check that the open and dialing peers don't exceed the connection capacity
        tracing::trace!(
            "limits: conns: {}/{} | reqs: {}/{}",
            self.connections_count(),
            max_open_connections,
            self.in_progress_downloads.len(),
            max_concurrent_requests
        );
        assert!(
            self.connections_count() <= *max_open_connections,
            "max_open_connections exceeded"
        );

        // check the active requests per peer don't exceed the limit
        for (node, info) in self.connected_nodes.iter() {
            assert!(
                info.active_requests() <= *max_concurrent_requests_per_node,
                "max_concurrent_requests_per_node exceeded for {node}"
            )
        }

        // check that we do not dial more nodes than allowed for the next pending hashes
        if let Some(kind) = self.queue.front() {
            let hash = kind.hash();
            let nodes = self.providers.get_candidates(&hash);
            let mut dialing = 0;
            for node in nodes {
                if self.dialer.is_pending(node) {
                    dialing += 1;
                }
            }
            assert!(
                dialing <= *max_concurrent_dials_per_hash,
                "max_concurrent_dials_per_hash exceeded for {hash}"
            )
        }
    }

    /// Checks that the count of active requests per peer is consistent with the active requests,
    /// and that active request are consistent with download futures
    #[track_caller]
    fn check_active_request_count(&self) {
        // check that the count of futures we are polling for downloads is consistent with the
        // number of requests
        assert_eq!(
            self.active_requests.len(),
            self.in_progress_downloads.len(),
            "active_requests and in_progress_downloads are out of sync"
        );
        // check that the count of requests per peer matches the number of requests that have that
        // peer as active
        let mut real_count: HashMap<NodeId, usize> =
            HashMap::with_capacity(self.connected_nodes.len());
        for req_info in self.active_requests.values() {
            // nothing like some classic word count
            *real_count.entry(req_info.node).or_default() += 1;
        }
        for (peer, info) in self.connected_nodes.iter() {
            assert_eq!(
                info.active_requests(),
                real_count.get(peer).copied().unwrap_or_default(),
                "mismatched count of active requests for {peer}"
            )
        }
    }

    /// Checks that the queued requests all appear in the provider map and request map.
    #[track_caller]
    fn check_queued_requests_consistency(&self) {
        // check that all hashes in the queue have candidates
        for entry in self.queue.iter() {
            assert!(
                self.providers
                    .get_candidates(&entry.hash())
                    .next()
                    .is_some(),
                "all queued requests have providers"
            );
            assert!(
                self.requests.contains_key(entry),
                "all queued requests have request info"
            );
        }

        // check that all parked hashes should be parked
        for entry in self.queue.iter_parked() {
            assert!(
                matches!(self.next_step(entry), NextStep::Park),
                "all parked downloads evaluate to the correct next step"
            );
            assert!(
                self.providers
                    .get_candidates(&entry.hash())
                    .all(|node| matches!(self.node_state(node), NodeState::WaitForRetry)),
                "all parked downloads have only retrying nodes"
            );
        }
    }

    /// Check that peers queued to be disconnected are consistent with peers considered idle.
    #[track_caller]
    fn check_idle_peer_consistency(&self) {
        let idle_peers = self
            .connected_nodes
            .values()
            .filter(|info| info.active_requests() == 0)
            .count();
        assert_eq!(
            self.goodbye_nodes_queue.len(),
            idle_peers,
            "inconsistent count of idle peers"
        );
    }

    /// Check that every hash in the provider map is needed.
    #[track_caller]
    fn check_provider_map_prunning(&self) {
        for hash in self.providers.hash_node.keys() {
            let as_raw = DownloadKind(HashAndFormat::raw(*hash));
            let as_hash_seq = DownloadKind(HashAndFormat::hash_seq(*hash));
            assert!(
                self.queue.contains_hash(*hash)
                    || self.active_requests.contains_key(&as_raw)
                    || self.active_requests.contains_key(&as_hash_seq),
                "all hashes in the provider map are in the queue or active"
            )
        }
    }
}