iroh_docs/engine/
state.rs

1use std::{
2    collections::BTreeMap,
3    time::{Instant, SystemTime},
4};
5
6use anyhow::Result;
7use iroh::EndpointId;
8use serde::{Deserialize, Serialize};
9use tracing::{debug, warn};
10
11use crate::{
12    net::{AbortReason, AcceptOutcome, SyncFinished},
13    NamespaceId,
14};
15
16/// Why we started a sync request
17#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Copy)]
18pub enum SyncReason {
19    /// Direct join request via API
20    DirectJoin,
21    /// Peer showed up as new neighbor in the gossip swarm
22    NewNeighbor,
23    /// We synced after receiving a sync report that indicated news for us
24    SyncReport,
25    /// We received a sync report while a sync was running, so run again afterwars
26    Resync,
27}
28
29/// Why we performed a sync exchange
30#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
31pub enum Origin {
32    /// We initiated the exchange
33    Connect(SyncReason),
34    /// A node connected to us and we accepted the exchange
35    Accept,
36}
37
38/// The state we're in for a node and a namespace
39#[derive(Debug, Clone, Default)]
40pub enum SyncState {
41    #[default]
42    Idle,
43    Running {
44        start: SystemTime,
45        origin: Origin,
46    },
47}
48
49/// Contains an entry for each active (syncing) namespace, and in there an entry for each node we
50/// synced with.
51#[derive(Default)]
52pub struct NamespaceStates(BTreeMap<NamespaceId, NamespaceState>);
53
54#[derive(Default)]
55struct NamespaceState {
56    nodes: BTreeMap<EndpointId, PeerState>,
57    may_emit_ready: bool,
58}
59
60impl NamespaceStates {
61    /// Are we syncing this namespace?
62    pub fn is_syncing(&self, namespace: &NamespaceId) -> bool {
63        self.0.contains_key(namespace)
64    }
65
66    /// Insert a namespace into the set of syncing namespaces.
67    pub fn insert(&mut self, namespace: NamespaceId) {
68        self.0.entry(namespace).or_default();
69    }
70
71    /// Start a sync request.
72    ///
73    /// Returns true if the request should be performed, and false if it should be aborted.
74    pub fn start_connect(
75        &mut self,
76        namespace: &NamespaceId,
77        node: EndpointId,
78        reason: SyncReason,
79    ) -> bool {
80        match self.entry(namespace, node) {
81            None => {
82                debug!("abort connect: namespace is not in sync set");
83                false
84            }
85            Some(state) => state.start_connect(reason),
86        }
87    }
88
89    /// Accept a sync request.
90    ///
91    /// Returns the [`AcceptOutcome`] to be performed.
92    pub fn accept_request(
93        &mut self,
94        me: &EndpointId,
95        namespace: &NamespaceId,
96        node: EndpointId,
97    ) -> AcceptOutcome {
98        let Some(state) = self.entry(namespace, node) else {
99            return AcceptOutcome::Reject(AbortReason::NotFound);
100        };
101        state.accept_request(me, &node)
102    }
103
104    /// Insert a finished sync operation into the state.
105    ///
106    /// Returns the time when the operation was started, and a `bool` that is true if another sync
107    /// request should be triggered right afterwards.
108    ///
109    /// Returns `None` if the namespace is not syncing or the sync state doesn't expect a finish
110    /// event.
111    pub fn finish(
112        &mut self,
113        namespace: &NamespaceId,
114        node: EndpointId,
115        origin: &Origin,
116        result: Result<SyncFinished>,
117    ) -> Option<(SystemTime, bool)> {
118        let state = self.entry(namespace, node)?;
119        state.finish(origin, result)
120    }
121
122    /// Set whether a [`super::live::Event::PendingContentReady`] may be emitted once the pending queue
123    /// becomes empty.
124    ///
125    /// This should be set to `true` if there are pending content hashes after a sync finished, and
126    /// to `false` whenever a `PendingContentReady` was emitted.
127    pub fn set_may_emit_ready(&mut self, namespace: &NamespaceId, value: bool) -> Option<()> {
128        let state = self.0.get_mut(namespace)?;
129        state.may_emit_ready = value;
130        Some(())
131    }
132    /// Returns whether a [`super::live::Event::PendingContentReady`] event may be emitted once the
133    /// pending queue becomes empty.
134    ///
135    /// If this returns `false`, an event should not be emitted even if the queue becomes empty,
136    /// because a currently running sync did not yet terminate. Once it terminates, the event will
137    /// be emitted from the handler for finished syncs.
138    pub fn may_emit_ready(&mut self, namespace: &NamespaceId) -> Option<bool> {
139        let state = self.0.get_mut(namespace)?;
140        if state.may_emit_ready {
141            state.may_emit_ready = false;
142            Some(true)
143        } else {
144            Some(false)
145        }
146    }
147
148    /// Remove a namespace from the set of syncing namespaces.
149    pub fn remove(&mut self, namespace: &NamespaceId) -> bool {
150        self.0.remove(namespace).is_some()
151    }
152
153    /// Get the [`PeerState`] for a namespace and node.
154    /// If the namespace is syncing and the node so far unknown, initialize and return a default [`PeerState`].
155    /// If the namespace is not syncing return None.
156    fn entry(&mut self, namespace: &NamespaceId, node: EndpointId) -> Option<&mut PeerState> {
157        self.0
158            .get_mut(namespace)
159            .map(|n| n.nodes.entry(node).or_default())
160    }
161}
162
163/// State of a node with regard to a namespace.
164#[derive(Default)]
165struct PeerState {
166    state: SyncState,
167    resync_requested: bool,
168    last_sync: Option<(Instant, Result<SyncFinished>)>,
169}
170
171impl PeerState {
172    fn finish(
173        &mut self,
174        origin: &Origin,
175        result: Result<SyncFinished>,
176    ) -> Option<(SystemTime, bool)> {
177        let start = match &self.state {
178            SyncState::Running {
179                start,
180                origin: origin2,
181            } => {
182                if origin2 != origin {
183                    warn!(actual = ?origin, expected = ?origin2, "finished sync origin does not match state")
184                }
185                Some(*start)
186            }
187            SyncState::Idle => {
188                warn!("sync state finish called but not in running state");
189                None
190            }
191        };
192
193        self.last_sync = Some((Instant::now(), result));
194        self.state = SyncState::Idle;
195        start.map(|s| (s, self.resync_requested))
196    }
197
198    fn start_connect(&mut self, reason: SyncReason) -> bool {
199        debug!(?reason, "start connect");
200        match self.state {
201            // never run two syncs at the same time
202            SyncState::Running { .. } => {
203                debug!("abort connect: sync already running");
204                if matches!(reason, SyncReason::SyncReport) {
205                    debug!("resync queued");
206                    self.resync_requested = true;
207                }
208                false
209            }
210            SyncState::Idle => {
211                self.set_sync_running(Origin::Connect(reason));
212                true
213            }
214        }
215    }
216
217    fn accept_request(&mut self, me: &EndpointId, node: &EndpointId) -> AcceptOutcome {
218        let outcome = match &self.state {
219            SyncState::Idle => AcceptOutcome::Allow,
220            SyncState::Running { origin, .. } => match origin {
221                Origin::Accept => AcceptOutcome::Reject(AbortReason::AlreadySyncing),
222                // Incoming sync request while we are dialing ourselves.
223                // In this case, compare the binary representations of our and the other node's id
224                // to deterministically decide which of the two concurrent connections will succeed.
225                Origin::Connect(_reason) => match expected_sync_direction(me, node) {
226                    SyncDirection::Accept => AcceptOutcome::Allow,
227                    SyncDirection::Connect => AcceptOutcome::Reject(AbortReason::AlreadySyncing),
228                },
229            },
230        };
231        if let AcceptOutcome::Allow = outcome {
232            self.set_sync_running(Origin::Accept);
233        }
234        outcome
235    }
236
237    fn set_sync_running(&mut self, origin: Origin) {
238        self.state = SyncState::Running {
239            origin,
240            start: SystemTime::now(),
241        };
242        self.resync_requested = false;
243    }
244}
245
246#[derive(Debug)]
247enum SyncDirection {
248    Accept,
249    Connect,
250}
251
252fn expected_sync_direction(self_node_id: &EndpointId, other_node_id: &EndpointId) -> SyncDirection {
253    if self_node_id.as_bytes() > other_node_id.as_bytes() {
254        SyncDirection::Accept
255    } else {
256        SyncDirection::Connect
257    }
258}