1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
//! [`Getter`] implementation that performs requests over [`Connection`]s.
//!
//! [`Connection`]: iroh_net::endpoint::Connection

use futures_lite::FutureExt;
use iroh_net::endpoint;

use super::{progress::BroadcastProgressSender, DownloadKind, FailureAction, GetStartFut, Getter};
use crate::{
    get::{db::get_to_db_in_steps, error::GetError},
    store::Store,
};

impl From<GetError> for FailureAction {
    fn from(e: GetError) -> Self {
        match e {
            e @ GetError::NotFound(_) => FailureAction::AbortRequest(e.into()),
            e @ GetError::RemoteReset(_) => FailureAction::RetryLater(e.into()),
            e @ GetError::NoncompliantNode(_) => FailureAction::DropPeer(e.into()),
            e @ GetError::Io(_) => FailureAction::RetryLater(e.into()),
            e @ GetError::BadRequest(_) => FailureAction::AbortRequest(e.into()),
            // TODO: what do we want to do on local failures?
            e @ GetError::LocalFailure(_) => FailureAction::AbortRequest(e.into()),
        }
    }
}

/// [`Getter`] implementation that performs requests over [`Connection`]s.
///
/// [`Connection`]: iroh_net::endpoint::Connection
pub(crate) struct IoGetter<S: Store> {
    pub store: S,
}

impl<S: Store> Getter for IoGetter<S> {
    type Connection = endpoint::Connection;
    type NeedsConn = crate::get::db::GetStateNeedsConn;

    fn get(
        &mut self,
        kind: DownloadKind,
        progress_sender: BroadcastProgressSender,
    ) -> GetStartFut<Self::NeedsConn> {
        let store = self.store.clone();
        async move {
            match get_to_db_in_steps(store, kind.hash_and_format(), progress_sender).await {
                Err(err) => Err(err.into()),
                Ok(crate::get::db::GetState::Complete(stats)) => {
                    Ok(super::GetOutput::Complete(stats))
                }
                Ok(crate::get::db::GetState::NeedsConn(needs_conn)) => {
                    Ok(super::GetOutput::NeedsConn(needs_conn))
                }
            }
        }
        .boxed_local()
    }
}

impl super::NeedsConn<endpoint::Connection> for crate::get::db::GetStateNeedsConn {
    fn proceed(self, conn: endpoint::Connection) -> super::GetProceedFut {
        async move {
            let res = self.proceed(conn).await;
            #[cfg(feature = "metrics")]
            track_metrics(&res);
            match res {
                Ok(stats) => Ok(stats),
                Err(err) => Err(err.into()),
            }
        }
        .boxed_local()
    }
}

#[cfg(feature = "metrics")]
fn track_metrics(res: &Result<crate::get::Stats, GetError>) {
    use iroh_metrics::{inc, inc_by};

    use crate::metrics::Metrics;
    match res {
        Ok(stats) => {
            let crate::get::Stats {
                bytes_written,
                bytes_read: _,
                elapsed,
            } = stats;

            inc!(Metrics, downloads_success);
            inc_by!(Metrics, download_bytes_total, *bytes_written);
            inc_by!(Metrics, download_time_total, elapsed.as_millis() as u64);
        }
        Err(e) => match &e {
            GetError::NotFound(_) => inc!(Metrics, downloads_notfound),
            _ => inc!(Metrics, downloads_error),
        },
    }
}