iroh_docs/engine/
state.rs

1use std::collections::BTreeMap;
2
3use anyhow::Result;
4use iroh::EndpointId;
5use n0_future::time::{Instant, SystemTime};
6use serde::{Deserialize, Serialize};
7use tracing::{debug, warn};
8
9use crate::{
10    net::{AbortReason, AcceptOutcome, SyncFinished},
11    NamespaceId,
12};
13
14/// Why we started a sync request
15#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Copy)]
16pub enum SyncReason {
17    /// Direct join request via API
18    DirectJoin,
19    /// Peer showed up as new neighbor in the gossip swarm
20    NewNeighbor,
21    /// We synced after receiving a sync report that indicated news for us
22    SyncReport,
23    /// We received a sync report while a sync was running, so run again afterwars
24    Resync,
25}
26
27/// Why we performed a sync exchange
28#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
29pub enum Origin {
30    /// We initiated the exchange
31    Connect(SyncReason),
32    /// A node connected to us and we accepted the exchange
33    Accept,
34}
35
36/// The state we're in for a node and a namespace
37#[derive(Default, Debug, Clone)]
38pub enum SyncState {
39    #[default]
40    Idle,
41    Running {
42        start: SystemTime,
43        origin: Origin,
44    },
45}
46
47/// Contains an entry for each active (syncing) namespace, and in there an entry for each node we
48/// synced with.
49#[derive(Default)]
50pub struct NamespaceStates(BTreeMap<NamespaceId, NamespaceState>);
51
52#[derive(Default)]
53struct NamespaceState {
54    nodes: BTreeMap<EndpointId, PeerState>,
55    may_emit_ready: bool,
56}
57
58impl NamespaceStates {
59    /// Are we syncing this namespace?
60    pub fn is_syncing(&self, namespace: &NamespaceId) -> bool {
61        self.0.contains_key(namespace)
62    }
63
64    /// Insert a namespace into the set of syncing namespaces.
65    pub fn insert(&mut self, namespace: NamespaceId) {
66        self.0.entry(namespace).or_default();
67    }
68
69    /// Start a sync request.
70    ///
71    /// Returns true if the request should be performed, and false if it should be aborted.
72    pub fn start_connect(
73        &mut self,
74        namespace: &NamespaceId,
75        node: EndpointId,
76        reason: SyncReason,
77    ) -> bool {
78        match self.entry(namespace, node) {
79            None => {
80                debug!("abort connect: namespace is not in sync set");
81                false
82            }
83            Some(state) => state.start_connect(reason),
84        }
85    }
86
87    /// Accept a sync request.
88    ///
89    /// Returns the [`AcceptOutcome`] to be performed.
90    pub fn accept_request(
91        &mut self,
92        me: &EndpointId,
93        namespace: &NamespaceId,
94        node: EndpointId,
95    ) -> AcceptOutcome {
96        let Some(state) = self.entry(namespace, node) else {
97            return AcceptOutcome::Reject(AbortReason::NotFound);
98        };
99        state.accept_request(me, &node)
100    }
101
102    /// Insert a finished sync operation into the state.
103    ///
104    /// Returns the time when the operation was started, and a `bool` that is true if another sync
105    /// request should be triggered right afterwards.
106    ///
107    /// Returns `None` if the namespace is not syncing or the sync state doesn't expect a finish
108    /// event.
109    pub fn finish(
110        &mut self,
111        namespace: &NamespaceId,
112        node: EndpointId,
113        origin: &Origin,
114        result: Result<SyncFinished>,
115    ) -> Option<(SystemTime, bool)> {
116        let state = self.entry(namespace, node)?;
117        state.finish(origin, result)
118    }
119
120    /// Set whether a [`super::live::Event::PendingContentReady`] may be emitted once the pending queue
121    /// becomes empty.
122    ///
123    /// This should be set to `true` if there are pending content hashes after a sync finished, and
124    /// to `false` whenever a `PendingContentReady` was emitted.
125    pub fn set_may_emit_ready(&mut self, namespace: &NamespaceId, value: bool) -> Option<()> {
126        let state = self.0.get_mut(namespace)?;
127        state.may_emit_ready = value;
128        Some(())
129    }
130    /// Returns whether a [`super::live::Event::PendingContentReady`] event may be emitted once the
131    /// pending queue becomes empty.
132    ///
133    /// If this returns `false`, an event should not be emitted even if the queue becomes empty,
134    /// because a currently running sync did not yet terminate. Once it terminates, the event will
135    /// be emitted from the handler for finished syncs.
136    pub fn may_emit_ready(&mut self, namespace: &NamespaceId) -> Option<bool> {
137        let state = self.0.get_mut(namespace)?;
138        if state.may_emit_ready {
139            state.may_emit_ready = false;
140            Some(true)
141        } else {
142            Some(false)
143        }
144    }
145
146    /// Remove a namespace from the set of syncing namespaces.
147    pub fn remove(&mut self, namespace: &NamespaceId) -> bool {
148        self.0.remove(namespace).is_some()
149    }
150
151    /// Get the [`PeerState`] for a namespace and node.
152    /// If the namespace is syncing and the node so far unknown, initialize and return a default [`PeerState`].
153    /// If the namespace is not syncing return None.
154    fn entry(&mut self, namespace: &NamespaceId, node: EndpointId) -> Option<&mut PeerState> {
155        self.0
156            .get_mut(namespace)
157            .map(|n| n.nodes.entry(node).or_default())
158    }
159}
160
161/// State of a node with regard to a namespace.
162#[derive(Default)]
163struct PeerState {
164    state: SyncState,
165    resync_requested: bool,
166    last_sync: Option<(Instant, Result<SyncFinished>)>,
167}
168
169impl PeerState {
170    fn finish(
171        &mut self,
172        origin: &Origin,
173        result: Result<SyncFinished>,
174    ) -> Option<(SystemTime, bool)> {
175        let start = match &self.state {
176            SyncState::Running {
177                start,
178                origin: origin2,
179            } => {
180                if origin2 != origin {
181                    warn!(actual = ?origin, expected = ?origin2, "finished sync origin does not match state")
182                }
183                Some(*start)
184            }
185            SyncState::Idle => {
186                warn!("sync state finish called but not in running state");
187                None
188            }
189        };
190
191        self.last_sync = Some((Instant::now(), result));
192        self.state = SyncState::Idle;
193        start.map(|s| (s, self.resync_requested))
194    }
195
196    fn start_connect(&mut self, reason: SyncReason) -> bool {
197        debug!(?reason, "start connect");
198        match self.state {
199            // never run two syncs at the same time
200            SyncState::Running { .. } => {
201                debug!("abort connect: sync already running");
202                if matches!(reason, SyncReason::SyncReport) {
203                    debug!("resync queued");
204                    self.resync_requested = true;
205                }
206                false
207            }
208            SyncState::Idle => {
209                self.set_sync_running(Origin::Connect(reason));
210                true
211            }
212        }
213    }
214
215    fn accept_request(&mut self, me: &EndpointId, node: &EndpointId) -> AcceptOutcome {
216        let outcome = match &self.state {
217            SyncState::Idle => AcceptOutcome::Allow,
218            SyncState::Running { origin, .. } => match origin {
219                Origin::Accept => AcceptOutcome::Reject(AbortReason::AlreadySyncing),
220                // Incoming sync request while we are dialing ourselves.
221                // In this case, compare the binary representations of our and the other node's id
222                // to deterministically decide which of the two concurrent connections will succeed.
223                Origin::Connect(_reason) => match expected_sync_direction(me, node) {
224                    SyncDirection::Accept => AcceptOutcome::Allow,
225                    SyncDirection::Connect => AcceptOutcome::Reject(AbortReason::AlreadySyncing),
226                },
227            },
228        };
229        if let AcceptOutcome::Allow = outcome {
230            self.set_sync_running(Origin::Accept);
231        }
232        outcome
233    }
234
235    fn set_sync_running(&mut self, origin: Origin) {
236        self.state = SyncState::Running {
237            origin,
238            start: SystemTime::now(),
239        };
240        self.resync_requested = false;
241    }
242}
243
244#[derive(Debug)]
245enum SyncDirection {
246    Accept,
247    Connect,
248}
249
250fn expected_sync_direction(self_node_id: &EndpointId, other_node_id: &EndpointId) -> SyncDirection {
251    if self_node_id.as_bytes() > other_node_id.as_bytes() {
252        SyncDirection::Accept
253    } else {
254        SyncDirection::Connect
255    }
256}