1use std::{
2 collections::BTreeMap,
3 time::{Instant, SystemTime},
4};
5
6use anyhow::Result;
7use iroh::EndpointId;
8use serde::{Deserialize, Serialize};
9use tracing::{debug, warn};
10
11use crate::{
12 net::{AbortReason, AcceptOutcome, SyncFinished},
13 NamespaceId,
14};
15
16#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Copy)]
18pub enum SyncReason {
19 DirectJoin,
21 NewNeighbor,
23 SyncReport,
25 Resync,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
31pub enum Origin {
32 Connect(SyncReason),
34 Accept,
36}
37
38#[derive(Debug, Clone, Default)]
40pub enum SyncState {
41 #[default]
42 Idle,
43 Running {
44 start: SystemTime,
45 origin: Origin,
46 },
47}
48
49#[derive(Default)]
52pub struct NamespaceStates(BTreeMap<NamespaceId, NamespaceState>);
53
54#[derive(Default)]
55struct NamespaceState {
56 nodes: BTreeMap<EndpointId, PeerState>,
57 may_emit_ready: bool,
58}
59
60impl NamespaceStates {
61 pub fn is_syncing(&self, namespace: &NamespaceId) -> bool {
63 self.0.contains_key(namespace)
64 }
65
66 pub fn insert(&mut self, namespace: NamespaceId) {
68 self.0.entry(namespace).or_default();
69 }
70
71 pub fn start_connect(
75 &mut self,
76 namespace: &NamespaceId,
77 node: EndpointId,
78 reason: SyncReason,
79 ) -> bool {
80 match self.entry(namespace, node) {
81 None => {
82 debug!("abort connect: namespace is not in sync set");
83 false
84 }
85 Some(state) => state.start_connect(reason),
86 }
87 }
88
89 pub fn accept_request(
93 &mut self,
94 me: &EndpointId,
95 namespace: &NamespaceId,
96 node: EndpointId,
97 ) -> AcceptOutcome {
98 let Some(state) = self.entry(namespace, node) else {
99 return AcceptOutcome::Reject(AbortReason::NotFound);
100 };
101 state.accept_request(me, &node)
102 }
103
104 pub fn finish(
112 &mut self,
113 namespace: &NamespaceId,
114 node: EndpointId,
115 origin: &Origin,
116 result: Result<SyncFinished>,
117 ) -> Option<(SystemTime, bool)> {
118 let state = self.entry(namespace, node)?;
119 state.finish(origin, result)
120 }
121
122 pub fn set_may_emit_ready(&mut self, namespace: &NamespaceId, value: bool) -> Option<()> {
128 let state = self.0.get_mut(namespace)?;
129 state.may_emit_ready = value;
130 Some(())
131 }
132 pub fn may_emit_ready(&mut self, namespace: &NamespaceId) -> Option<bool> {
139 let state = self.0.get_mut(namespace)?;
140 if state.may_emit_ready {
141 state.may_emit_ready = false;
142 Some(true)
143 } else {
144 Some(false)
145 }
146 }
147
148 pub fn remove(&mut self, namespace: &NamespaceId) -> bool {
150 self.0.remove(namespace).is_some()
151 }
152
153 fn entry(&mut self, namespace: &NamespaceId, node: EndpointId) -> Option<&mut PeerState> {
157 self.0
158 .get_mut(namespace)
159 .map(|n| n.nodes.entry(node).or_default())
160 }
161}
162
163#[derive(Default)]
165struct PeerState {
166 state: SyncState,
167 resync_requested: bool,
168 last_sync: Option<(Instant, Result<SyncFinished>)>,
169}
170
171impl PeerState {
172 fn finish(
173 &mut self,
174 origin: &Origin,
175 result: Result<SyncFinished>,
176 ) -> Option<(SystemTime, bool)> {
177 let start = match &self.state {
178 SyncState::Running {
179 start,
180 origin: origin2,
181 } => {
182 if origin2 != origin {
183 warn!(actual = ?origin, expected = ?origin2, "finished sync origin does not match state")
184 }
185 Some(*start)
186 }
187 SyncState::Idle => {
188 warn!("sync state finish called but not in running state");
189 None
190 }
191 };
192
193 self.last_sync = Some((Instant::now(), result));
194 self.state = SyncState::Idle;
195 start.map(|s| (s, self.resync_requested))
196 }
197
198 fn start_connect(&mut self, reason: SyncReason) -> bool {
199 debug!(?reason, "start connect");
200 match self.state {
201 SyncState::Running { .. } => {
203 debug!("abort connect: sync already running");
204 if matches!(reason, SyncReason::SyncReport) {
205 debug!("resync queued");
206 self.resync_requested = true;
207 }
208 false
209 }
210 SyncState::Idle => {
211 self.set_sync_running(Origin::Connect(reason));
212 true
213 }
214 }
215 }
216
217 fn accept_request(&mut self, me: &EndpointId, node: &EndpointId) -> AcceptOutcome {
218 let outcome = match &self.state {
219 SyncState::Idle => AcceptOutcome::Allow,
220 SyncState::Running { origin, .. } => match origin {
221 Origin::Accept => AcceptOutcome::Reject(AbortReason::AlreadySyncing),
222 Origin::Connect(_reason) => match expected_sync_direction(me, node) {
226 SyncDirection::Accept => AcceptOutcome::Allow,
227 SyncDirection::Connect => AcceptOutcome::Reject(AbortReason::AlreadySyncing),
228 },
229 },
230 };
231 if let AcceptOutcome::Allow = outcome {
232 self.set_sync_running(Origin::Accept);
233 }
234 outcome
235 }
236
237 fn set_sync_running(&mut self, origin: Origin) {
238 self.state = SyncState::Running {
239 origin,
240 start: SystemTime::now(),
241 };
242 self.resync_requested = false;
243 }
244}
245
246#[derive(Debug)]
247enum SyncDirection {
248 Accept,
249 Connect,
250}
251
252fn expected_sync_direction(self_node_id: &EndpointId, other_node_id: &EndpointId) -> SyncDirection {
253 if self_node_id.as_bytes() > other_node_id.as_bytes() {
254 SyncDirection::Accept
255 } else {
256 SyncDirection::Connect
257 }
258}