iroh_docs/engine/
state.rs

1use std::{
2    collections::BTreeMap,
3    time::{Instant, SystemTime},
4};
5
6use anyhow::Result;
7use iroh::NodeId;
8use serde::{Deserialize, Serialize};
9use tracing::{debug, warn};
10
11use crate::{
12    net::{AbortReason, AcceptOutcome, SyncFinished},
13    NamespaceId,
14};
15
16/// Why we started a sync request
17#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Copy)]
18pub enum SyncReason {
19    /// Direct join request via API
20    DirectJoin,
21    /// Peer showed up as new neighbor in the gossip swarm
22    NewNeighbor,
23    /// We synced after receiving a sync report that indicated news for us
24    SyncReport,
25    /// We received a sync report while a sync was running, so run again afterwars
26    Resync,
27}
28
29/// Why we performed a sync exchange
30#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
31pub enum Origin {
32    /// We initiated the exchange
33    Connect(SyncReason),
34    /// A node connected to us and we accepted the exchange
35    Accept,
36}
37
38/// The state we're in for a node and a namespace
39#[derive(Debug, Clone)]
40pub enum SyncState {
41    Idle,
42    Running { start: SystemTime, origin: Origin },
43}
44
45impl Default for SyncState {
46    fn default() -> Self {
47        Self::Idle
48    }
49}
50
51/// Contains an entry for each active (syncing) namespace, and in there an entry for each node we
52/// synced with.
53#[derive(Default)]
54pub struct NamespaceStates(BTreeMap<NamespaceId, NamespaceState>);
55
56#[derive(Default)]
57struct NamespaceState {
58    nodes: BTreeMap<NodeId, PeerState>,
59    may_emit_ready: bool,
60}
61
62impl NamespaceStates {
63    /// Are we syncing this namespace?
64    pub fn is_syncing(&self, namespace: &NamespaceId) -> bool {
65        self.0.contains_key(namespace)
66    }
67
68    /// Insert a namespace into the set of syncing namespaces.
69    pub fn insert(&mut self, namespace: NamespaceId) {
70        self.0.entry(namespace).or_default();
71    }
72
73    /// Start a sync request.
74    ///
75    /// Returns true if the request should be performed, and false if it should be aborted.
76    pub fn start_connect(
77        &mut self,
78        namespace: &NamespaceId,
79        node: NodeId,
80        reason: SyncReason,
81    ) -> bool {
82        match self.entry(namespace, node) {
83            None => {
84                debug!("abort connect: namespace is not in sync set");
85                false
86            }
87            Some(state) => state.start_connect(reason),
88        }
89    }
90
91    /// Accept a sync request.
92    ///
93    /// Returns the [`AcceptOutcome`] to be performed.
94    pub fn accept_request(
95        &mut self,
96        me: &NodeId,
97        namespace: &NamespaceId,
98        node: NodeId,
99    ) -> AcceptOutcome {
100        let Some(state) = self.entry(namespace, node) else {
101            return AcceptOutcome::Reject(AbortReason::NotFound);
102        };
103        state.accept_request(me, &node)
104    }
105
106    /// Insert a finished sync operation into the state.
107    ///
108    /// Returns the time when the operation was started, and a `bool` that is true if another sync
109    /// request should be triggered right afterwards.
110    ///
111    /// Returns `None` if the namespace is not syncing or the sync state doesn't expect a finish
112    /// event.
113    pub fn finish(
114        &mut self,
115        namespace: &NamespaceId,
116        node: NodeId,
117        origin: &Origin,
118        result: Result<SyncFinished>,
119    ) -> Option<(SystemTime, bool)> {
120        let state = self.entry(namespace, node)?;
121        state.finish(origin, result)
122    }
123
124    /// Set whether a [`super::live::Event::PendingContentReady`] may be emitted once the pending queue
125    /// becomes empty.
126    ///
127    /// This should be set to `true` if there are pending content hashes after a sync finished, and
128    /// to `false` whenever a `PendingContentReady` was emitted.
129    pub fn set_may_emit_ready(&mut self, namespace: &NamespaceId, value: bool) -> Option<()> {
130        let state = self.0.get_mut(namespace)?;
131        state.may_emit_ready = value;
132        Some(())
133    }
134    /// Returns whether a [`super::live::Event::PendingContentReady`] event may be emitted once the
135    /// pending queue becomes empty.
136    ///
137    /// If this returns `false`, an event should not be emitted even if the queue becomes empty,
138    /// because a currently running sync did not yet terminate. Once it terminates, the event will
139    /// be emitted from the handler for finished syncs.
140    pub fn may_emit_ready(&mut self, namespace: &NamespaceId) -> Option<bool> {
141        let state = self.0.get_mut(namespace)?;
142        if state.may_emit_ready {
143            state.may_emit_ready = false;
144            Some(true)
145        } else {
146            Some(false)
147        }
148    }
149
150    /// Remove a namespace from the set of syncing namespaces.
151    pub fn remove(&mut self, namespace: &NamespaceId) -> bool {
152        self.0.remove(namespace).is_some()
153    }
154
155    /// Get the [`PeerState`] for a namespace and node.
156    /// If the namespace is syncing and the node so far unknown, initialize and return a default [`PeerState`].
157    /// If the namespace is not syncing return None.
158    fn entry(&mut self, namespace: &NamespaceId, node: NodeId) -> Option<&mut PeerState> {
159        self.0
160            .get_mut(namespace)
161            .map(|n| n.nodes.entry(node).or_default())
162    }
163}
164
165/// State of a node with regard to a namespace.
166#[derive(Default)]
167struct PeerState {
168    state: SyncState,
169    resync_requested: bool,
170    last_sync: Option<(Instant, Result<SyncFinished>)>,
171}
172
173impl PeerState {
174    fn finish(
175        &mut self,
176        origin: &Origin,
177        result: Result<SyncFinished>,
178    ) -> Option<(SystemTime, bool)> {
179        let start = match &self.state {
180            SyncState::Running {
181                start,
182                origin: origin2,
183            } => {
184                if origin2 != origin {
185                    warn!(actual = ?origin, expected = ?origin2, "finished sync origin does not match state")
186                }
187                Some(*start)
188            }
189            SyncState::Idle => {
190                warn!("sync state finish called but not in running state");
191                None
192            }
193        };
194
195        self.last_sync = Some((Instant::now(), result));
196        self.state = SyncState::Idle;
197        start.map(|s| (s, self.resync_requested))
198    }
199
200    fn start_connect(&mut self, reason: SyncReason) -> bool {
201        debug!(?reason, "start connect");
202        match self.state {
203            // never run two syncs at the same time
204            SyncState::Running { .. } => {
205                debug!("abort connect: sync already running");
206                if matches!(reason, SyncReason::SyncReport) {
207                    debug!("resync queued");
208                    self.resync_requested = true;
209                }
210                false
211            }
212            SyncState::Idle => {
213                self.set_sync_running(Origin::Connect(reason));
214                true
215            }
216        }
217    }
218
219    fn accept_request(&mut self, me: &NodeId, node: &NodeId) -> AcceptOutcome {
220        let outcome = match &self.state {
221            SyncState::Idle => AcceptOutcome::Allow,
222            SyncState::Running { origin, .. } => match origin {
223                Origin::Accept => AcceptOutcome::Reject(AbortReason::AlreadySyncing),
224                // Incoming sync request while we are dialing ourselves.
225                // In this case, compare the binary representations of our and the other node's id
226                // to deterministically decide which of the two concurrent connections will succeed.
227                Origin::Connect(_reason) => match expected_sync_direction(me, node) {
228                    SyncDirection::Accept => AcceptOutcome::Allow,
229                    SyncDirection::Connect => AcceptOutcome::Reject(AbortReason::AlreadySyncing),
230                },
231            },
232        };
233        if let AcceptOutcome::Allow = outcome {
234            self.set_sync_running(Origin::Accept);
235        }
236        outcome
237    }
238
239    fn set_sync_running(&mut self, origin: Origin) {
240        self.state = SyncState::Running {
241            origin,
242            start: SystemTime::now(),
243        };
244        self.resync_requested = false;
245    }
246}
247
248#[derive(Debug)]
249enum SyncDirection {
250    Accept,
251    Connect,
252}
253
254fn expected_sync_direction(self_node_id: &NodeId, other_node_id: &NodeId) -> SyncDirection {
255    if self_node_id.as_bytes() > other_node_id.as_bytes() {
256        SyncDirection::Accept
257    } else {
258        SyncDirection::Connect
259    }
260}