1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
use anyhow::Result;
use iroh_base::hash::Hash;

use super::{
    bounds::{ByKeyBounds, RecordsBounds},
    ranges::{RecordsByKeyRange, RecordsRange},
    RecordsValue,
};
use crate::{
    store::{
        fs::tables::ReadOnlyTables,
        util::{IndexKind, LatestPerKeySelector, SelectorRes},
        AuthorFilter, KeyFilter, Query,
    },
    AuthorId, NamespaceId, SignedEntry,
};

/// A query iterator for entry queries.
#[derive(Debug)]
pub struct QueryIterator {
    range: QueryRange,
    query: Query,
    offset: u64,
    count: u64,
}

#[derive(Debug)]
enum QueryRange {
    AuthorKey {
        range: RecordsRange<'static>,
        key_filter: KeyFilter,
    },
    KeyAuthor {
        range: RecordsByKeyRange,
        author_filter: AuthorFilter,
        selector: Option<LatestPerKeySelector>,
    },
}

impl QueryIterator {
    pub fn new(tables: ReadOnlyTables, namespace: NamespaceId, query: Query) -> Result<Self> {
        let index_kind = IndexKind::from(&query);
        let range = match index_kind {
            IndexKind::AuthorKey { range, key_filter } => {
                let (bounds, filter) = match range {
                    // single author: both author and key are selected via the range. therefore
                    // set `filter` to `Any`.
                    AuthorFilter::Exact(author) => (
                        RecordsBounds::author_key(namespace, author, key_filter),
                        KeyFilter::Any,
                    ),
                    // no author set => full table scan with the provided key filter
                    AuthorFilter::Any => (RecordsBounds::namespace(namespace), key_filter),
                };
                let range = RecordsRange::with_bounds_static(&tables.records, bounds)?;
                QueryRange::AuthorKey {
                    range,
                    key_filter: filter,
                }
            }
            IndexKind::KeyAuthor {
                range,
                author_filter,
                latest_per_key,
            } => {
                let bounds = ByKeyBounds::new(namespace, &range);
                let range =
                    RecordsByKeyRange::with_bounds(tables.records_by_key, tables.records, bounds)?;
                let selector = latest_per_key.then(LatestPerKeySelector::default);
                QueryRange::KeyAuthor {
                    author_filter,
                    range,
                    selector,
                }
            }
        };

        Ok(Self {
            range,
            query,
            offset: 0,
            count: 0,
        })
    }
}

impl Iterator for QueryIterator {
    type Item = Result<SignedEntry>;

    fn next(&mut self) -> Option<Result<SignedEntry>> {
        // early-return if we reached the query limit.
        if let Some(limit) = self.query.limit() {
            if self.count >= limit {
                return None;
            }
        }
        loop {
            let next = match &mut self.range {
                QueryRange::AuthorKey { range, key_filter } => {
                    // get the next entry from the query range, filtered by the key and empty filters
                    range.next_filtered(&self.query.sort_direction, |(_ns, _author, key), value| {
                        key_filter.matches(key)
                            && (self.query.include_empty || !value_is_empty(&value))
                    })
                }

                QueryRange::KeyAuthor {
                    range,
                    author_filter,
                    selector,
                } => loop {
                    // get the next entry from the query range, filtered by the author filter
                    let next = range
                        .next_filtered(&self.query.sort_direction, |(_ns, _key, author)| {
                            author_filter.matches(&(AuthorId::from(author)))
                        });

                    // early-break if next contains Err
                    let next = match next.transpose() {
                        Err(err) => break Some(Err(err)),
                        Ok(next) => next,
                    };

                    // push the entry into the selector. if active, only the latest entry
                    // for each key will be emitted.
                    let next = match selector {
                        None => next,
                        Some(selector) => match selector.push(next) {
                            SelectorRes::Continue => continue,
                            SelectorRes::Finished => None,
                            SelectorRes::Some(res) => Some(res),
                        },
                    };

                    // skip the entry if empty and no empty entries requested
                    if !self.query.include_empty && matches!(&next, Some(e) if e.is_empty()) {
                        continue;
                    }

                    break next.map(Result::Ok);
                },
            };

            // skip the entry if we didn't get past the requested offset yet.
            if self.offset < self.query.offset() && matches!(next, Some(Ok(_))) {
                self.offset += 1;
                continue;
            }

            self.count += 1;
            return next;
        }
    }
}

fn value_is_empty(value: &RecordsValue) -> bool {
    let (_timestamp, _namespace_sig, _author_sig, _len, hash) = value;
    *hash == Hash::EMPTY.as_bytes()
}