1use std::{
2 collections::BTreeMap,
3 time::{Instant, SystemTime},
4};
5
6use anyhow::Result;
7use iroh::NodeId;
8use serde::{Deserialize, Serialize};
9use tracing::{debug, warn};
10
11use crate::{
12 net::{AbortReason, AcceptOutcome, SyncFinished},
13 NamespaceId,
14};
15
16#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Copy)]
18pub enum SyncReason {
19 DirectJoin,
21 NewNeighbor,
23 SyncReport,
25 Resync,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
31pub enum Origin {
32 Connect(SyncReason),
34 Accept,
36}
37
38#[derive(Debug, Clone)]
40pub enum SyncState {
41 Idle,
42 Running { start: SystemTime, origin: Origin },
43}
44
45impl Default for SyncState {
46 fn default() -> Self {
47 Self::Idle
48 }
49}
50
51#[derive(Default)]
54pub struct NamespaceStates(BTreeMap<NamespaceId, NamespaceState>);
55
56#[derive(Default)]
57struct NamespaceState {
58 nodes: BTreeMap<NodeId, PeerState>,
59 may_emit_ready: bool,
60}
61
62impl NamespaceStates {
63 pub fn is_syncing(&self, namespace: &NamespaceId) -> bool {
65 self.0.contains_key(namespace)
66 }
67
68 pub fn insert(&mut self, namespace: NamespaceId) {
70 self.0.entry(namespace).or_default();
71 }
72
73 pub fn start_connect(
77 &mut self,
78 namespace: &NamespaceId,
79 node: NodeId,
80 reason: SyncReason,
81 ) -> bool {
82 match self.entry(namespace, node) {
83 None => {
84 debug!("abort connect: namespace is not in sync set");
85 false
86 }
87 Some(state) => state.start_connect(reason),
88 }
89 }
90
91 pub fn accept_request(
95 &mut self,
96 me: &NodeId,
97 namespace: &NamespaceId,
98 node: NodeId,
99 ) -> AcceptOutcome {
100 let Some(state) = self.entry(namespace, node) else {
101 return AcceptOutcome::Reject(AbortReason::NotFound);
102 };
103 state.accept_request(me, &node)
104 }
105
106 pub fn finish(
114 &mut self,
115 namespace: &NamespaceId,
116 node: NodeId,
117 origin: &Origin,
118 result: Result<SyncFinished>,
119 ) -> Option<(SystemTime, bool)> {
120 let state = self.entry(namespace, node)?;
121 state.finish(origin, result)
122 }
123
124 pub fn set_may_emit_ready(&mut self, namespace: &NamespaceId, value: bool) -> Option<()> {
130 let state = self.0.get_mut(namespace)?;
131 state.may_emit_ready = value;
132 Some(())
133 }
134 pub fn may_emit_ready(&mut self, namespace: &NamespaceId) -> Option<bool> {
141 let state = self.0.get_mut(namespace)?;
142 if state.may_emit_ready {
143 state.may_emit_ready = false;
144 Some(true)
145 } else {
146 Some(false)
147 }
148 }
149
150 pub fn remove(&mut self, namespace: &NamespaceId) -> bool {
152 self.0.remove(namespace).is_some()
153 }
154
155 fn entry(&mut self, namespace: &NamespaceId, node: NodeId) -> Option<&mut PeerState> {
159 self.0
160 .get_mut(namespace)
161 .map(|n| n.nodes.entry(node).or_default())
162 }
163}
164
165#[derive(Default)]
167struct PeerState {
168 state: SyncState,
169 resync_requested: bool,
170 last_sync: Option<(Instant, Result<SyncFinished>)>,
171}
172
173impl PeerState {
174 fn finish(
175 &mut self,
176 origin: &Origin,
177 result: Result<SyncFinished>,
178 ) -> Option<(SystemTime, bool)> {
179 let start = match &self.state {
180 SyncState::Running {
181 start,
182 origin: origin2,
183 } => {
184 if origin2 != origin {
185 warn!(actual = ?origin, expected = ?origin2, "finished sync origin does not match state")
186 }
187 Some(*start)
188 }
189 SyncState::Idle => {
190 warn!("sync state finish called but not in running state");
191 None
192 }
193 };
194
195 self.last_sync = Some((Instant::now(), result));
196 self.state = SyncState::Idle;
197 start.map(|s| (s, self.resync_requested))
198 }
199
200 fn start_connect(&mut self, reason: SyncReason) -> bool {
201 debug!(?reason, "start connect");
202 match self.state {
203 SyncState::Running { .. } => {
205 debug!("abort connect: sync already running");
206 if matches!(reason, SyncReason::SyncReport) {
207 debug!("resync queued");
208 self.resync_requested = true;
209 }
210 false
211 }
212 SyncState::Idle => {
213 self.set_sync_running(Origin::Connect(reason));
214 true
215 }
216 }
217 }
218
219 fn accept_request(&mut self, me: &NodeId, node: &NodeId) -> AcceptOutcome {
220 let outcome = match &self.state {
221 SyncState::Idle => AcceptOutcome::Allow,
222 SyncState::Running { origin, .. } => match origin {
223 Origin::Accept => AcceptOutcome::Reject(AbortReason::AlreadySyncing),
224 Origin::Connect(_reason) => match expected_sync_direction(me, node) {
228 SyncDirection::Accept => AcceptOutcome::Allow,
229 SyncDirection::Connect => AcceptOutcome::Reject(AbortReason::AlreadySyncing),
230 },
231 },
232 };
233 if let AcceptOutcome::Allow = outcome {
234 self.set_sync_running(Origin::Accept);
235 }
236 outcome
237 }
238
239 fn set_sync_running(&mut self, origin: Origin) {
240 self.state = SyncState::Running {
241 origin,
242 start: SystemTime::now(),
243 };
244 self.resync_requested = false;
245 }
246}
247
248#[derive(Debug)]
249enum SyncDirection {
250 Accept,
251 Connect,
252}
253
254fn expected_sync_direction(self_node_id: &NodeId, other_node_id: &NodeId) -> SyncDirection {
255 if self_node_id.as_bytes() > other_node_id.as_bytes() {
256 SyncDirection::Accept
257 } else {
258 SyncDirection::Connect
259 }
260}