1use std::collections::BTreeMap;
2
3use anyhow::Result;
4use iroh::EndpointId;
5use n0_future::time::{Instant, SystemTime};
6use serde::{Deserialize, Serialize};
7use tracing::{debug, warn};
8
9use crate::{
10 net::{AbortReason, AcceptOutcome, SyncFinished},
11 NamespaceId,
12};
13
14#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Copy)]
16pub enum SyncReason {
17 DirectJoin,
19 NewNeighbor,
21 SyncReport,
23 Resync,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
29pub enum Origin {
30 Connect(SyncReason),
32 Accept,
34}
35
36#[derive(Default, Debug, Clone)]
38pub enum SyncState {
39 #[default]
40 Idle,
41 Running {
42 start: SystemTime,
43 origin: Origin,
44 },
45}
46
47#[derive(Default)]
50pub struct NamespaceStates(BTreeMap<NamespaceId, NamespaceState>);
51
52#[derive(Default)]
53struct NamespaceState {
54 nodes: BTreeMap<EndpointId, PeerState>,
55 may_emit_ready: bool,
56}
57
58impl NamespaceStates {
59 pub fn is_syncing(&self, namespace: &NamespaceId) -> bool {
61 self.0.contains_key(namespace)
62 }
63
64 pub fn insert(&mut self, namespace: NamespaceId) {
66 self.0.entry(namespace).or_default();
67 }
68
69 pub fn start_connect(
73 &mut self,
74 namespace: &NamespaceId,
75 node: EndpointId,
76 reason: SyncReason,
77 ) -> bool {
78 match self.entry(namespace, node) {
79 None => {
80 debug!("abort connect: namespace is not in sync set");
81 false
82 }
83 Some(state) => state.start_connect(reason),
84 }
85 }
86
87 pub fn accept_request(
91 &mut self,
92 me: &EndpointId,
93 namespace: &NamespaceId,
94 node: EndpointId,
95 ) -> AcceptOutcome {
96 let Some(state) = self.entry(namespace, node) else {
97 return AcceptOutcome::Reject(AbortReason::NotFound);
98 };
99 state.accept_request(me, &node)
100 }
101
102 pub fn finish(
110 &mut self,
111 namespace: &NamespaceId,
112 node: EndpointId,
113 origin: &Origin,
114 result: Result<SyncFinished>,
115 ) -> Option<(SystemTime, bool)> {
116 let state = self.entry(namespace, node)?;
117 state.finish(origin, result)
118 }
119
120 pub fn set_may_emit_ready(&mut self, namespace: &NamespaceId, value: bool) -> Option<()> {
126 let state = self.0.get_mut(namespace)?;
127 state.may_emit_ready = value;
128 Some(())
129 }
130 pub fn may_emit_ready(&mut self, namespace: &NamespaceId) -> Option<bool> {
137 let state = self.0.get_mut(namespace)?;
138 if state.may_emit_ready {
139 state.may_emit_ready = false;
140 Some(true)
141 } else {
142 Some(false)
143 }
144 }
145
146 pub fn remove(&mut self, namespace: &NamespaceId) -> bool {
148 self.0.remove(namespace).is_some()
149 }
150
151 fn entry(&mut self, namespace: &NamespaceId, node: EndpointId) -> Option<&mut PeerState> {
155 self.0
156 .get_mut(namespace)
157 .map(|n| n.nodes.entry(node).or_default())
158 }
159}
160
161#[derive(Default)]
163struct PeerState {
164 state: SyncState,
165 resync_requested: bool,
166 last_sync: Option<(Instant, Result<SyncFinished>)>,
167}
168
169impl PeerState {
170 fn finish(
171 &mut self,
172 origin: &Origin,
173 result: Result<SyncFinished>,
174 ) -> Option<(SystemTime, bool)> {
175 let start = match &self.state {
176 SyncState::Running {
177 start,
178 origin: origin2,
179 } => {
180 if origin2 != origin {
181 warn!(actual = ?origin, expected = ?origin2, "finished sync origin does not match state")
182 }
183 Some(*start)
184 }
185 SyncState::Idle => {
186 warn!("sync state finish called but not in running state");
187 None
188 }
189 };
190
191 self.last_sync = Some((Instant::now(), result));
192 self.state = SyncState::Idle;
193 start.map(|s| (s, self.resync_requested))
194 }
195
196 fn start_connect(&mut self, reason: SyncReason) -> bool {
197 debug!(?reason, "start connect");
198 match self.state {
199 SyncState::Running { .. } => {
201 debug!("abort connect: sync already running");
202 if matches!(reason, SyncReason::SyncReport) {
203 debug!("resync queued");
204 self.resync_requested = true;
205 }
206 false
207 }
208 SyncState::Idle => {
209 self.set_sync_running(Origin::Connect(reason));
210 true
211 }
212 }
213 }
214
215 fn accept_request(&mut self, me: &EndpointId, node: &EndpointId) -> AcceptOutcome {
216 let outcome = match &self.state {
217 SyncState::Idle => AcceptOutcome::Allow,
218 SyncState::Running { origin, .. } => match origin {
219 Origin::Accept => AcceptOutcome::Reject(AbortReason::AlreadySyncing),
220 Origin::Connect(_reason) => match expected_sync_direction(me, node) {
224 SyncDirection::Accept => AcceptOutcome::Allow,
225 SyncDirection::Connect => AcceptOutcome::Reject(AbortReason::AlreadySyncing),
226 },
227 },
228 };
229 if let AcceptOutcome::Allow = outcome {
230 self.set_sync_running(Origin::Accept);
231 }
232 outcome
233 }
234
235 fn set_sync_running(&mut self, origin: Origin) {
236 self.state = SyncState::Running {
237 origin,
238 start: SystemTime::now(),
239 };
240 self.resync_requested = false;
241 }
242}
243
244#[derive(Debug)]
245enum SyncDirection {
246 Accept,
247 Connect,
248}
249
250fn expected_sync_direction(self_node_id: &EndpointId, other_node_id: &EndpointId) -> SyncDirection {
251 if self_node_id.as_bytes() > other_node_id.as_bytes() {
252 SyncDirection::Accept
253 } else {
254 SyncDirection::Connect
255 }
256}