#![cfg(any(test, debug_assertions))]
use super::*;
impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
#[track_caller]
pub(in crate::downloader) fn check_invariants(&self) {
self.check_active_request_count();
self.check_queued_requests_consistency();
self.check_idle_peer_consistency();
self.check_concurrency_limits();
self.check_provider_map_prunning();
}
#[track_caller]
fn check_concurrency_limits(&self) {
let ConcurrencyLimits {
max_concurrent_requests,
max_concurrent_requests_per_node,
max_open_connections,
max_concurrent_dials_per_hash,
} = &self.concurrency_limits;
assert!(
self.in_progress_downloads.len() <= *max_concurrent_requests,
"max_concurrent_requests exceeded"
);
tracing::trace!(
"limits: conns: {}/{} | reqs: {}/{}",
self.connections_count(),
max_open_connections,
self.in_progress_downloads.len(),
max_concurrent_requests
);
assert!(
self.connections_count() <= *max_open_connections,
"max_open_connections exceeded"
);
for (node, info) in self.connected_nodes.iter() {
assert!(
info.active_requests() <= *max_concurrent_requests_per_node,
"max_concurrent_requests_per_node exceeded for {node}"
)
}
if let Some(kind) = self.queue.front() {
let hash = kind.hash();
let nodes = self.providers.get_candidates(&hash);
let mut dialing = 0;
for node in nodes {
if self.dialer.is_pending(node) {
dialing += 1;
}
}
assert!(
dialing <= *max_concurrent_dials_per_hash,
"max_concurrent_dials_per_hash exceeded for {hash}"
)
}
}
#[track_caller]
fn check_active_request_count(&self) {
assert_eq!(
self.active_requests.len(),
self.in_progress_downloads.len(),
"active_requests and in_progress_downloads are out of sync"
);
let mut real_count: HashMap<NodeId, usize> =
HashMap::with_capacity(self.connected_nodes.len());
for req_info in self.active_requests.values() {
*real_count.entry(req_info.node).or_default() += 1;
}
for (peer, info) in self.connected_nodes.iter() {
assert_eq!(
info.active_requests(),
real_count.get(peer).copied().unwrap_or_default(),
"mismatched count of active requests for {peer}"
)
}
}
#[track_caller]
fn check_queued_requests_consistency(&self) {
for entry in self.queue.iter() {
assert!(
self.providers
.get_candidates(&entry.hash())
.next()
.is_some(),
"all queued requests have providers"
);
assert!(
self.requests.contains_key(entry),
"all queued requests have request info"
);
}
for entry in self.queue.iter_parked() {
assert!(
matches!(self.next_step(entry), NextStep::Park),
"all parked downloads evaluate to the correct next step"
);
assert!(
self.providers
.get_candidates(&entry.hash())
.all(|node| matches!(self.node_state(node), NodeState::WaitForRetry)),
"all parked downloads have only retrying nodes"
);
}
}
#[track_caller]
fn check_idle_peer_consistency(&self) {
let idle_peers = self
.connected_nodes
.values()
.filter(|info| info.active_requests() == 0)
.count();
assert_eq!(
self.goodbye_nodes_queue.len(),
idle_peers,
"inconsistent count of idle peers"
);
}
#[track_caller]
fn check_provider_map_prunning(&self) {
for hash in self.providers.hash_node.keys() {
let as_raw = DownloadKind(HashAndFormat::raw(*hash));
let as_hash_seq = DownloadKind(HashAndFormat::hash_seq(*hash));
assert!(
self.queue.contains_hash(*hash)
|| self.active_requests.contains_key(&as_raw)
|| self.active_requests.contains_key(&as_hash_seq),
"all hashes in the provider map are in the queue or active"
)
}
}
}