n0_watcher/
lib.rs

1//! Watchable values.
2//!
3//! A [`Watchable`] exists to keep track of a value which may change over time.  It allows
4//! observers to be notified of changes to the value.  The aim is to always be aware of the
5//! **last** value, not to observe *every* value change.
6//!
7//! The reason for this is ergonomics and predictable resource usage: Requiring every
8//! intermediate value to be observable would mean that either the side that sets new values
9//! using [`Watchable::set`] would need to wait for all "receivers" of these intermediate
10//! values to catch up and thus be an async operation, or it would require the receivers
11//! to buffer intermediate values until they've been "received" on the [`Watcher`]s with
12//! an unlimited buffer size and thus potentially unlimited memory growth.
13//!
14//! # Example
15//!
16//! ```
17//! use n0_future::StreamExt;
18//! use n0_watcher::{Watchable, Watcher as _};
19//!
20//! #[tokio::main(flavor = "current_thread", start_paused = true)]
21//! async fn main() {
22//!     let watchable = Watchable::new(None);
23//!
24//!     // A task that waits for the watcher to be initialized to Some(value) before printing it
25//!     let mut watcher = watchable.watch();
26//!     tokio::spawn(async move {
27//!         let initialized_value = watcher.initialized().await;
28//!         println!("initialized: {initialized_value}");
29//!     });
30//!
31//!     // A task that prints every update to the watcher since the initial one:
32//!     let mut updates = watchable.watch().stream_updates_only();
33//!     tokio::spawn(async move {
34//!         while let Some(update) = updates.next().await {
35//!             println!("update: {update:?}");
36//!         }
37//!     });
38//!
39//!     // A task that prints the current value and then every update it can catch,
40//!     // but it also does something else which makes it very slow to pick up new
41//!     // values, so it'll skip some:
42//!     let mut current_and_updates = watchable.watch().stream();
43//!     tokio::spawn(async move {
44//!         while let Some(update) = current_and_updates.next().await {
45//!             println!("update2: {update:?}");
46//!             tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
47//!         }
48//!     });
49//!
50//!     for i in 0..20 {
51//!         println!("Setting watchable to {i}");
52//!         watchable.set(Some(i)).ok();
53//!         tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
54//!     }
55//! }
56//! ```
57//!
58//! # Similar but different
59//!
60//! - `async_channel`: This is a multi-producer, multi-consumer channel implementation.
61//!   Only at most one consumer will receive each "produced" value.
62//!   What we want is to have every "produced" value to be "broadcast" to every receiver.
63//! - `tokio::broadcast`: Also a multi-producer, multi-consumer channel implementation.
64//!   This is very similar to this crate (`tokio::broadcast::Sender` is like [`Watchable`]
65//!   and `tokio::broadcast::Receiver` is like [`Watcher`]), but you can't get the latest
66//!   value without `.await`ing on the receiver, and it'll internally store a queue of
67//!   intermediate values.
68//! - `tokio::watch`: Also a MPSC channel, and unlike `tokio::broadcast` only retains the
69//!   latest value. That module has pretty much the same purpose as this crate, but doesn't
70//!   implement a poll-based method of getting updates and doesn't implement combinators.
71//! - [`std::sync::RwLock`]: (wrapped in an [`std::sync::Arc`]) This allows you access
72//!   to the latest values, but might block while it's being set (but that could be short
73//!   enough not to matter for async rust purposes).
74//!   This doesn't allow you to be notified whenever a new value is written.
75//! - The `watchable` crate: We used to use this crate at n0, but we wanted to experiment
76//!   with different APIs and needed Wasm support.
77#[cfg(not(watcher_loom))]
78use std::sync;
79use std::{
80    collections::VecDeque,
81    future::Future,
82    pin::Pin,
83    sync::{Arc, RwLockReadGuard, Weak},
84    task::{self, ready, Poll, Waker},
85};
86
87#[cfg(watcher_loom)]
88use loom::sync;
89use n0_error::StackError;
90use sync::{Mutex, RwLock};
91
92/// A wrapper around a value that notifies [`Watcher`]s when the value is modified.
93///
94/// Only the most recent value is available to any observer, but the observer is guaranteed
95/// to be notified of the most recent value.
96#[derive(Debug, Default)]
97pub struct Watchable<T> {
98    shared: Arc<Shared<T>>,
99}
100
101impl<T> Clone for Watchable<T> {
102    fn clone(&self) -> Self {
103        Self {
104            shared: self.shared.clone(),
105        }
106    }
107}
108
109/// Abstracts over `Option<T>` and `Vec<T>`
110pub trait Nullable<T> {
111    /// Converts this value into an `Option`.
112    fn into_option(self) -> Option<T>;
113}
114
115impl<T> Nullable<T> for Option<T> {
116    fn into_option(self) -> Option<T> {
117        self
118    }
119}
120
121impl<T> Nullable<T> for Vec<T> {
122    fn into_option(mut self) -> Option<T> {
123        self.pop()
124    }
125}
126
127impl<T: Clone + Eq> Watchable<T> {
128    /// Creates a [`Watchable`] initialized to given value.
129    pub fn new(value: T) -> Self {
130        Self {
131            shared: Arc::new(Shared {
132                state: RwLock::new(State {
133                    value,
134                    epoch: INITIAL_EPOCH,
135                }),
136                wakers: Default::default(),
137            }),
138        }
139    }
140
141    /// Sets a new value.
142    ///
143    /// Returns `Ok(previous_value)` if the value was different from the one set, or
144    /// returns the provided value back as `Err(value)` if the value didn't change.
145    ///
146    /// Watchers are only notified if the value changed.
147    pub fn set(&self, value: T) -> Result<T, T> {
148        // We don't actually write when the value didn't change, but there's unfortunately
149        // no way to upgrade a read guard to a write guard, and locking as read first, then
150        // dropping and locking as write introduces a possible race condition.
151        let mut state = self.shared.state.write().expect("poisoned");
152
153        // Find out if the value changed
154        let changed = state.value != value;
155
156        let ret = if changed {
157            let old = std::mem::replace(&mut state.value, value);
158            state.epoch += 1;
159            Ok(old)
160        } else {
161            Err(value)
162        };
163        drop(state); // No need to write anymore
164
165        // Notify watchers
166        if changed {
167            for watcher in self.shared.wakers.lock().expect("poisoned").drain(..) {
168                watcher.wake();
169            }
170        }
171        ret
172    }
173
174    /// Atomically modifies the value using a closure.
175    ///
176    /// The closure receives a mutable reference to the current value and may modify
177    /// it in place. The read-modify-write is performed under a write lock, so it
178    /// happens atomically with respect to other callers of [`Watchable::set`],
179    /// [`Watchable::modify`], [`Watchable::get`], and [`Watcher`]s. The closure
180    /// should therefore be fast and must not call back into this `Watchable`
181    /// (doing so will deadlock).
182    ///
183    /// Watchers are only notified if the value actually changed, as determined by
184    /// `Eq`. Whatever the closure returns is forwarded as the return value of this
185    /// method, which makes it convenient for expressing compare-and-swap:
186    ///
187    /// ```
188    /// # use n0_watcher::Watchable;
189    /// let watchable = Watchable::new(42);
190    ///
191    /// let swapped = watchable.modify(|value| {
192    ///     if *value == 42 {
193    ///         *value = 100;
194    ///         true
195    ///     } else {
196    ///         false
197    ///     }
198    /// });
199    /// assert!(swapped);
200    /// assert_eq!(watchable.get(), 100);
201    /// ```
202    pub fn modify<F, R>(&self, f: F) -> R
203    where
204        F: FnOnce(&mut T) -> R,
205    {
206        let mut state = self.shared.state.write().expect("poisoned");
207        let old = state.value.clone();
208        let ret = f(&mut state.value);
209        let changed = old != state.value;
210        if changed {
211            state.epoch += 1;
212        }
213        drop(state);
214
215        if changed {
216            for watcher in self.shared.wakers.lock().expect("poisoned").drain(..) {
217                watcher.wake();
218            }
219        }
220        ret
221    }
222
223    /// Creates a [`Direct`] [`Watcher`], allowing the value to be observed, but not modified.
224    pub fn watch(&self) -> Direct<T> {
225        Direct {
226            state: self.shared.state().clone(),
227            shared: Some(Arc::downgrade(&self.shared)),
228        }
229    }
230
231    /// Returns the currently stored value.
232    pub fn get(&self) -> T {
233        self.shared.get()
234    }
235
236    /// Returns true when there are any watchers actively listening on changes,
237    /// or false when all watchers have been dropped or none have been created yet.
238    pub fn has_watchers(&self) -> bool {
239        // `Watchable`s will increase the strong count
240        // `Direct`s watchers (which all watchers descend from) will increase the weak count
241        Arc::weak_count(&self.shared) != 0
242    }
243}
244
245impl<T> Drop for Shared<T> {
246    fn drop(&mut self) {
247        let Ok(mut watchers) = self.wakers.lock() else {
248            return; // Poisoned waking?
249        };
250        // Wake all watchers once we drop Shared (this happens when
251        // the last `Watchable` is dropped).
252        // This allows us to notify `NextFut::poll`s and have that
253        // return `Disconnected`.
254        for watcher in watchers.drain(..) {
255            watcher.wake();
256        }
257    }
258}
259
260/// A handle to a value that's represented by one or more underlying [`Watchable`]s.
261///
262/// A [`Watcher`] can get the current value, and will be notified when the value changes.
263/// Only the most recent value is accessible, and if the threads with the underlying [`Watchable`]s
264/// change the value faster than the threads with the [`Watcher`] can keep up with, then
265/// it'll miss in-between values.
266/// When the thread changing the [`Watchable`] pauses updating, the [`Watcher`] will always
267/// end up reporting the most recent state eventually.
268///
269/// Watchers can be modified via [`Watcher::map`] to observe a value derived from the original
270/// value via a function.
271///
272/// Watchers can be combined via [`Watcher::or`] to allow observing multiple values at once and
273/// getting an update in case any of the values updates.
274///
275/// One of the underlying [`Watchable`]s might already be dropped. In that case,
276/// the watcher will be "disconnected" and return [`Err(Disconnected)`](Disconnected)
277/// on some function calls or, when turned into a stream, that stream will end.
278/// This property can also be checked with [`Watcher::is_connected`].
279pub trait Watcher: Clone {
280    /// The type of value that can change.
281    ///
282    /// We require `Clone`, because we need to be able to make
283    /// the values have a lifetime that's detached from the original [`Watchable`]'s
284    /// lifetime.
285    ///
286    /// We require `Eq`, to be able to check whether the value actually changed or
287    /// not, so we can notify or not notify accordingly.
288    type Value: Clone + Eq;
289
290    /// Updates the watcher to the latest value and returns that value.
291    ///
292    /// If any of the underlying [`Watchable`] values have been dropped, then this
293    /// might return an outdated value for that watchable, specifically, the latest
294    /// value that was fetched for that watchable, as opposed to the latest value
295    /// that was set on the watchable before it was dropped.
296    ///
297    /// The default implementation for this is simply
298    /// ```ignore
299    /// fn get(&mut self) -> Self::Value {
300    ///     self.update();
301    ///     self.peek().clone()
302    /// }
303    /// ```
304    fn get(&mut self) -> Self::Value {
305        self.update();
306        self.peek().clone()
307    }
308
309    /// Updates the watcher to the latest value and returns whether it changed.
310    ///
311    /// Watchers keep track of the "latest known" value they fetched.
312    /// This function updates that internal value by looking up the latest value
313    /// at the [`Watchable`]\(s\) that this watcher is linked to.
314    fn update(&mut self) -> bool;
315
316    /// Returns a reference to the value currently stored in the watcher.
317    ///
318    /// Watchers keep track of the "latest known" value they fetched.
319    /// Calling this won't update the latest value, unlike [`Watcher::get`] or
320    /// [`Watcher::update`].
321    ///
322    /// This can be useful if you want to avoid copying out the internal value
323    /// frequently like what [`Watcher::get`] will end up doing.
324    fn peek(&self) -> &Self::Value;
325
326    /// Whether this watcher is still connected to all of its underlying [`Watchable`]s.
327    ///
328    /// Returns false when any of the underlying watchables has been dropped.
329    fn is_connected(&self) -> bool;
330
331    /// Polls for the next value, or returns [`Disconnected`] if one of the underlying
332    /// [`Watchable`]s has been dropped.
333    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>>;
334
335    /// Returns a future completing with `Ok(value)` once a new value is set, or with
336    /// [`Err(Disconnected)`](Disconnected) if the connected [`Watchable`] was dropped.
337    ///
338    /// # Cancel Safety
339    ///
340    /// The returned future is cancel-safe.
341    fn updated(&mut self) -> NextFut<'_, Self> {
342        NextFut { watcher: self }
343    }
344
345    /// Returns a future completing once the value is set to [`Some`] value.
346    ///
347    /// If the current value is [`Some`] value, this future will resolve immediately.
348    ///
349    /// This is a utility for the common case of storing an [`Option`] inside a
350    /// [`Watchable`].
351    ///
352    /// # Cancel Safety
353    ///
354    /// The returned future is cancel-safe.
355    fn initialized<T, W>(&mut self) -> InitializedFut<'_, T, W, Self>
356    where
357        W: Nullable<T> + Clone,
358        Self: Watcher<Value = W>,
359    {
360        InitializedFut {
361            initial: self.get().into_option(),
362            watcher: self,
363        }
364    }
365
366    /// Returns a stream which will yield the most recent values as items.
367    ///
368    /// The first item of the stream is the current value, so that this stream can be easily
369    /// used to operate on the most recent value.
370    ///
371    /// Note however, that only the last item is stored.  If the stream is not polled when an
372    /// item is available it can be replaced with another item by the time it is polled.
373    ///
374    /// This stream ends once the original [`Watchable`] has been dropped.
375    ///
376    /// # Cancel Safety
377    ///
378    /// The returned stream is cancel-safe.
379    fn stream(mut self) -> Stream<Self>
380    where
381        Self: Unpin,
382    {
383        Stream {
384            initial: Some(self.get()),
385            watcher: self,
386        }
387    }
388
389    /// Returns a stream which will yield the most recent values as items, starting from
390    /// the next unobserved future value.
391    ///
392    /// This means this stream will only yield values when the watched value changes,
393    /// the value stored at the time the stream is created is not yielded.
394    ///
395    /// Note however, that only the last item is stored.  If the stream is not polled when an
396    /// item is available it can be replaced with another item by the time it is polled.
397    ///
398    /// This stream ends once the original [`Watchable`] has been dropped.
399    ///
400    /// # Cancel Safety
401    ///
402    /// The returned stream is cancel-safe.
403    fn stream_updates_only(self) -> Stream<Self>
404    where
405        Self: Unpin,
406    {
407        Stream {
408            initial: None,
409            watcher: self,
410        }
411    }
412
413    /// Maps this watcher with a function that transforms the observed values.
414    ///
415    /// The returned watcher will only register updates, when the *mapped* value
416    /// observably changes.
417    fn map<T: Clone + Eq>(
418        mut self,
419        map: impl Fn(Self::Value) -> T + Send + Sync + 'static,
420    ) -> Map<Self, T> {
421        Map {
422            current: (map)(self.get()),
423            map: Arc::new(map),
424            watcher: self,
425        }
426    }
427
428    /// Returns a watcher that updates every time this or the other watcher
429    /// updates, and yields both watcher's items together when that happens.
430    fn or<W: Watcher>(self, other: W) -> Tuple<Self, W> {
431        Tuple::new(self, other)
432    }
433}
434
435/// The immediate, direct observer of a [`Watchable`] value.
436///
437/// This type is mainly used via the [`Watcher`] interface.
438#[derive(Debug, Clone)]
439pub struct Direct<T> {
440    state: State<T>,
441    // We wrap the Weak with an Option, so that we can set it to `None` once we
442    // notice that Weak is not upgradable anymore for the first time.
443    // This allows the weak pointer's allocation to be freed in case this makes
444    // the weak count go to zero (even if Direct is still kept around).
445    shared: Option<Weak<Shared<T>>>,
446}
447
448impl<T: Clone + Eq> Watcher for Direct<T> {
449    type Value = T;
450
451    fn update(&mut self) -> bool {
452        let Some(shared) = self.shared.as_ref().and_then(|weak| weak.upgrade()) else {
453            self.shared = None; // Weak won't be upgradable in the future, this way we can allow the allocation to be freed
454            return false;
455        };
456        let state = shared.state();
457        if state.epoch > self.state.epoch {
458            self.state = state.clone();
459            true
460        } else {
461            false
462        }
463    }
464
465    fn peek(&self) -> &Self::Value {
466        &self.state.value
467    }
468
469    fn is_connected(&self) -> bool {
470        self.shared
471            .as_ref()
472            .and_then(|weak| weak.upgrade())
473            .is_some()
474    }
475
476    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
477        let Some(shared) = self.shared.as_ref().and_then(|weak| weak.upgrade()) else {
478            self.shared = None; // Weak won't be upgradable in the future, this way we can allow the allocation to be freed
479            return Poll::Ready(Err(Disconnected));
480        };
481        self.state = ready!(shared.poll_updated(cx, self.state.epoch));
482        Poll::Ready(Ok(()))
483    }
484}
485
486#[derive(Debug, Clone)]
487pub struct Tuple<S: Watcher, T: Watcher> {
488    inner: (S, T),
489    current: (S::Value, T::Value),
490}
491
492impl<S: Watcher, T: Watcher> Tuple<S, T> {
493    pub fn new(mut s: S, mut t: T) -> Self {
494        let current = (s.get(), t.get());
495        Self {
496            inner: (s, t),
497            current,
498        }
499    }
500}
501
502impl<S: Watcher, T: Watcher> Watcher for Tuple<S, T> {
503    type Value = (S::Value, T::Value);
504
505    fn update(&mut self) -> bool {
506        // We need to update all watchers! So don't early-return
507        let s_updated = self.inner.0.update();
508        let t_updated = self.inner.1.update();
509        let updated = s_updated || t_updated;
510        if updated {
511            self.current = (self.inner.0.peek().clone(), self.inner.1.peek().clone());
512        }
513        updated
514    }
515
516    fn peek(&self) -> &Self::Value {
517        &self.current
518    }
519
520    fn is_connected(&self) -> bool {
521        self.inner.0.is_connected() && self.inner.1.is_connected()
522    }
523
524    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
525        let poll_0 = self.inner.0.poll_updated(cx)?;
526        let poll_1 = self.inner.1.poll_updated(cx)?;
527        if poll_0.is_pending() && poll_1.is_pending() {
528            return Poll::Pending;
529        }
530        if poll_0.is_ready() {
531            self.current.0 = self.inner.0.peek().clone();
532        }
533        if poll_1.is_ready() {
534            self.current.1 = self.inner.1.peek().clone();
535        }
536        Poll::Ready(Ok(()))
537    }
538}
539
540#[derive(Debug, Clone)]
541pub struct Triple<S: Watcher, T: Watcher, U: Watcher> {
542    inner: (S, T, U),
543    current: (S::Value, T::Value, U::Value),
544}
545
546impl<S: Watcher, T: Watcher, U: Watcher> Triple<S, T, U> {
547    pub fn new(mut s: S, mut t: T, mut u: U) -> Self {
548        let current = (s.get(), t.get(), u.get());
549        Self {
550            inner: (s, t, u),
551            current,
552        }
553    }
554}
555
556impl<S: Watcher, T: Watcher, U: Watcher> Watcher for Triple<S, T, U> {
557    type Value = (S::Value, T::Value, U::Value);
558
559    fn update(&mut self) -> bool {
560        // We need to update all watchers! So don't early-return
561        let s_updated = self.inner.0.update();
562        let t_updated = self.inner.1.update();
563        let u_updated = self.inner.2.update();
564        let updated = s_updated || t_updated || u_updated;
565        if updated {
566            self.current = (
567                self.inner.0.peek().clone(),
568                self.inner.1.peek().clone(),
569                self.inner.2.peek().clone(),
570            );
571        }
572        updated
573    }
574
575    fn peek(&self) -> &Self::Value {
576        &self.current
577    }
578
579    fn is_connected(&self) -> bool {
580        self.inner.0.is_connected() && self.inner.1.is_connected() && self.inner.2.is_connected()
581    }
582
583    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
584        let poll_0 = self.inner.0.poll_updated(cx)?;
585        let poll_1 = self.inner.1.poll_updated(cx)?;
586        let poll_2 = self.inner.2.poll_updated(cx)?;
587
588        if poll_0.is_pending() && poll_1.is_pending() && poll_2.is_pending() {
589            return Poll::Pending;
590        }
591        if poll_0.is_ready() {
592            self.current.0 = self.inner.0.peek().clone();
593        }
594        if poll_1.is_ready() {
595            self.current.1 = self.inner.1.peek().clone();
596        }
597        if poll_2.is_ready() {
598            self.current.2 = self.inner.2.peek().clone();
599        }
600        Poll::Ready(Ok(()))
601    }
602}
603
604/// Combinator to join two watchers
605#[derive(Debug, Clone)]
606pub struct Join<T: Clone + Eq, W: Watcher<Value = T>> {
607    // invariant: watchers.len() == current.len()
608    watchers: Vec<W>,
609    current: Vec<T>,
610}
611
612impl<T: Clone + Eq, W: Watcher<Value = T>> Join<T, W> {
613    /// Joins a set of watchers into a single watcher
614    pub fn new(watchers: impl Iterator<Item = W>) -> Self {
615        let mut watchers: Vec<W> = watchers.into_iter().collect();
616
617        let mut current = Vec::with_capacity(watchers.len());
618        for watcher in &mut watchers {
619            current.push(watcher.get());
620        }
621        Self { watchers, current }
622    }
623}
624
625impl<T: Clone + Eq, W: Watcher<Value = T>> Watcher for Join<T, W> {
626    type Value = Vec<T>;
627
628    fn update(&mut self) -> bool {
629        let mut any_updated = false;
630        for (value, watcher) in self.current.iter_mut().zip(self.watchers.iter_mut()) {
631            if watcher.update() {
632                any_updated = true;
633                *value = watcher.peek().clone();
634            }
635        }
636        any_updated
637    }
638
639    fn peek(&self) -> &Self::Value {
640        &self.current
641    }
642
643    fn is_connected(&self) -> bool {
644        self.watchers.iter().all(|w| w.is_connected())
645    }
646
647    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
648        let mut any_updated = false;
649        for (value, watcher) in self.current.iter_mut().zip(self.watchers.iter_mut()) {
650            if watcher.poll_updated(cx)?.is_ready() {
651                any_updated = true;
652                *value = watcher.peek().clone();
653            }
654        }
655
656        if any_updated {
657            Poll::Ready(Ok(()))
658        } else {
659            Poll::Pending
660        }
661    }
662}
663
664/// Wraps a [`Watcher`] to allow observing a derived value.
665///
666/// See [`Watcher::map`].
667#[derive(derive_more::Debug, Clone)]
668pub struct Map<W: Watcher, T: Clone + Eq> {
669    #[debug("Arc<dyn Fn(W::Value) -> T>")]
670    map: Arc<dyn Fn(W::Value) -> T + Send + Sync + 'static>,
671    watcher: W,
672    current: T,
673}
674
675impl<W: Watcher, T: Clone + Eq> Watcher for Map<W, T> {
676    type Value = T;
677
678    fn update(&mut self) -> bool {
679        if self.watcher.update() {
680            let new = (self.map)(self.watcher.peek().clone());
681            if new != self.current {
682                self.current = new;
683                true
684            } else {
685                false
686            }
687        } else {
688            false
689        }
690    }
691
692    fn peek(&self) -> &Self::Value {
693        &self.current
694    }
695
696    fn is_connected(&self) -> bool {
697        self.watcher.is_connected()
698    }
699
700    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
701        loop {
702            ready!(self.watcher.poll_updated(cx)?);
703            let new = (self.map)(self.watcher.peek().clone());
704            if new != self.current {
705                self.current = new;
706                return Poll::Ready(Ok(()));
707            }
708        }
709    }
710}
711
712/// Future returning the next item after the current one in a [`Watcher`].
713///
714/// See [`Watcher::updated`].
715///
716/// # Cancel Safety
717///
718/// This future is cancel-safe.
719#[derive(Debug)]
720pub struct NextFut<'a, W: Watcher> {
721    watcher: &'a mut W,
722}
723
724impl<W: Watcher> Future for NextFut<'_, W> {
725    type Output = Result<W::Value, Disconnected>;
726
727    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
728        ready!(self.watcher.poll_updated(cx))?;
729        Poll::Ready(Ok(self.watcher.peek().clone()))
730    }
731}
732
733/// Future returning the current or next value that's [`Some`] value.
734/// in a [`Watcher`].
735///
736/// See [`Watcher::initialized`].
737///
738/// # Cancel Safety
739///
740/// This Future is cancel-safe.
741#[derive(Debug)]
742pub struct InitializedFut<'a, T, V: Nullable<T> + Clone, W: Watcher<Value = V>> {
743    initial: Option<T>,
744    watcher: &'a mut W,
745}
746
747impl<T: Clone + Eq + Unpin, V: Nullable<T> + Clone, W: Watcher<Value = V> + Unpin> Future
748    for InitializedFut<'_, T, V, W>
749{
750    type Output = T;
751
752    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
753        let mut this = self.as_mut();
754        if let Some(value) = this.initial.take() {
755            return Poll::Ready(value);
756        }
757        loop {
758            if ready!(this.watcher.poll_updated(cx)).is_err() {
759                // The value will never be initialized
760                return Poll::Pending;
761            };
762            let value = this.watcher.peek();
763            if let Some(value) = value.clone().into_option() {
764                return Poll::Ready(value);
765            }
766        }
767    }
768}
769
770/// A stream for a [`Watcher`]'s next values.
771///
772/// See [`Watcher::stream`] and [`Watcher::stream_updates_only`].
773///
774/// # Cancel Safety
775///
776/// This stream is cancel-safe.
777#[derive(Debug, Clone)]
778pub struct Stream<W: Watcher + Unpin> {
779    initial: Option<W::Value>,
780    watcher: W,
781}
782
783impl<W: Watcher + Unpin> n0_future::Stream for Stream<W>
784where
785    W::Value: Unpin,
786{
787    type Item = W::Value;
788
789    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
790        if let Some(value) = self.as_mut().initial.take() {
791            return Poll::Ready(Some(value));
792        }
793        match self.as_mut().watcher.poll_updated(cx) {
794            Poll::Ready(Ok(())) => Poll::Ready(Some(self.as_ref().watcher.peek().clone())),
795            Poll::Ready(Err(Disconnected)) => Poll::Ready(None),
796            Poll::Pending => Poll::Pending,
797        }
798    }
799}
800
801/// The error for when a [`Watcher`] is disconnected from its underlying
802/// [`Watchable`] value, because of that watchable having been dropped.
803#[derive(StackError)]
804#[error("Watcher lost connection to underlying Watchable, it was dropped")]
805pub struct Disconnected;
806
807// Private:
808
809const INITIAL_EPOCH: u64 = 1;
810
811/// The shared state for a [`Watchable`].
812#[derive(Debug, Default)]
813struct Shared<T> {
814    /// The value to be watched and its current epoch.
815    state: RwLock<State<T>>,
816    wakers: Mutex<VecDeque<Waker>>,
817}
818
819#[derive(Debug, Clone)]
820struct State<T> {
821    value: T,
822    epoch: u64,
823}
824
825impl<T: Default> Default for State<T> {
826    fn default() -> Self {
827        Self {
828            value: Default::default(),
829            epoch: INITIAL_EPOCH,
830        }
831    }
832}
833
834impl<T: Clone> Shared<T> {
835    fn get(&self) -> T {
836        self.state.read().expect("poisoned").value.clone()
837    }
838
839    fn state(&self) -> RwLockReadGuard<'_, State<T>> {
840        self.state.read().expect("poisoned")
841    }
842
843    fn poll_updated(&self, cx: &mut task::Context<'_>, last_epoch: u64) -> Poll<State<T>> {
844        {
845            let state = self.state();
846
847            // We might get spurious wakeups due to e.g. a second-to-last Watchable being dropped.
848            // This makes sure we don't accidentally return an update that's not actually an update.
849            if last_epoch < state.epoch {
850                return Poll::Ready(state.clone());
851            }
852        }
853
854        self.add_waker(cx);
855
856        #[cfg(watcher_loom)]
857        loom::thread::yield_now();
858
859        // We check for an update again to prevent races between putting in wakers and looking for updates.
860        {
861            let state = self.state();
862
863            if last_epoch < state.epoch {
864                return Poll::Ready(state.clone());
865            }
866        }
867
868        Poll::Pending
869    }
870
871    fn add_waker(&self, cx: &mut task::Context<'_>) {
872        let mut wakers = self.wakers.lock().expect("poisoned");
873        for waker in wakers.iter() {
874            if waker.will_wake(cx.waker()) {
875                return;
876            }
877        }
878        wakers.push_back(cx.waker().clone());
879    }
880}
881
882#[cfg(test)]
883mod tests {
884
885    use n0_future::{future::poll_once, StreamExt};
886    use rand::{rng, Rng};
887    use tokio::{
888        task::JoinSet,
889        time::{Duration, Instant},
890    };
891    use tokio_util::sync::CancellationToken;
892
893    use super::*;
894
895    #[tokio::test]
896    async fn test_watcher() {
897        let cancel = CancellationToken::new();
898        let watchable = Watchable::new(17);
899
900        assert_eq!(watchable.watch().stream().next().await.unwrap(), 17);
901
902        let start = Instant::now();
903        // spawn watchers
904        let mut tasks = JoinSet::new();
905        for i in 0..3 {
906            let mut watch = watchable.watch().stream();
907            let cancel = cancel.clone();
908            tasks.spawn(async move {
909                println!("[{i}] spawn");
910                let mut expected_value = 17;
911                loop {
912                    tokio::select! {
913                        biased;
914                        Some(value) = &mut watch.next() => {
915                            println!("{:?} [{i}] update: {value}", start.elapsed());
916                            assert_eq!(value, expected_value);
917                            if expected_value == 17 {
918                                expected_value = 0;
919                            } else {
920                                expected_value += 1;
921                            }
922                        },
923                        _ = cancel.cancelled() => {
924                            println!("{:?} [{i}] cancel", start.elapsed());
925                            assert_eq!(expected_value, 10);
926                            break;
927                        }
928                    }
929                }
930            });
931        }
932        for i in 0..3 {
933            let mut watch = watchable.watch().stream_updates_only();
934            let cancel = cancel.clone();
935            tasks.spawn(async move {
936                println!("[{i}] spawn");
937                let mut expected_value = 0;
938                loop {
939                    tokio::select! {
940                        biased;
941                        Some(value) = watch.next() => {
942                            println!("{:?} [{i}] stream update: {value}", start.elapsed());
943                            assert_eq!(value, expected_value);
944                            expected_value += 1;
945                        },
946                        _ = cancel.cancelled() => {
947                            println!("{:?} [{i}] cancel", start.elapsed());
948                            assert_eq!(expected_value, 10);
949                            break;
950                        }
951                        else => {
952                            panic!("stream died");
953                        }
954                    }
955                }
956            });
957        }
958
959        // set value
960        for next_value in 0..10 {
961            let sleep = Duration::from_nanos(rng().random_range(0..100_000_000));
962            println!("{:?} sleep {sleep:?}", start.elapsed());
963            tokio::time::sleep(sleep).await;
964
965            let changed = watchable.set(next_value);
966            println!("{:?} set {next_value} changed={changed:?}", start.elapsed());
967        }
968
969        println!("cancel");
970        cancel.cancel();
971        while let Some(res) = tasks.join_next().await {
972            res.expect("task failed");
973        }
974    }
975
976    #[test]
977    fn test_get() {
978        let watchable = Watchable::new(None);
979        assert!(watchable.get().is_none());
980
981        watchable.set(Some(1u8)).ok();
982        assert_eq!(watchable.get(), Some(1u8));
983    }
984
985    #[tokio::test]
986    async fn test_initialize() {
987        let watchable = Watchable::new(None);
988
989        let mut watcher = watchable.watch();
990        let mut initialized = watcher.initialized();
991
992        let poll = poll_once(&mut initialized).await;
993        assert!(poll.is_none());
994
995        watchable.set(Some(1u8)).ok();
996
997        let poll = poll_once(&mut initialized).await;
998        assert_eq!(poll.unwrap(), 1u8);
999    }
1000
1001    #[tokio::test]
1002    async fn test_initialize_already_init() {
1003        let watchable = Watchable::new(Some(1u8));
1004
1005        let mut watcher = watchable.watch();
1006        let mut initialized = watcher.initialized();
1007
1008        let poll = poll_once(&mut initialized).await;
1009        assert_eq!(poll.unwrap(), 1u8);
1010    }
1011
1012    #[test]
1013    fn test_initialized_always_resolves() {
1014        #[cfg(not(watcher_loom))]
1015        use std::thread;
1016
1017        #[cfg(watcher_loom)]
1018        use loom::thread;
1019
1020        let test_case = || {
1021            let watchable = Watchable::<Option<u8>>::new(None);
1022
1023            let mut watch = watchable.watch();
1024            let thread = thread::spawn(move || n0_future::future::block_on(watch.initialized()));
1025
1026            watchable.set(Some(42)).ok();
1027
1028            thread::yield_now();
1029
1030            let value: u8 = thread.join().unwrap();
1031
1032            assert_eq!(value, 42);
1033        };
1034
1035        #[cfg(watcher_loom)]
1036        loom::model(test_case);
1037        #[cfg(not(watcher_loom))]
1038        test_case();
1039    }
1040
1041    #[tokio::test(flavor = "multi_thread")]
1042    async fn test_update_cancel_safety() {
1043        let watchable = Watchable::new(0);
1044        let mut watch = watchable.watch();
1045        const MAX: usize = 100_000;
1046
1047        let handle = tokio::spawn(async move {
1048            let mut last_observed = 0;
1049
1050            while last_observed != MAX {
1051                tokio::select! {
1052                    val = watch.updated() => {
1053                        let Ok(val) = val else {
1054                            return;
1055                        };
1056
1057                        assert_ne!(val, last_observed, "never observe the same value twice, even with cancellation");
1058                        last_observed = val;
1059                    }
1060                    _ = tokio::time::sleep(Duration::from_micros(rng().random_range(0..10_000))) => {
1061                        // We cancel the other future and start over again
1062                        continue;
1063                    }
1064                }
1065            }
1066        });
1067
1068        for i in 1..=MAX {
1069            watchable.set(i).ok();
1070            if rng().random_bool(0.2) {
1071                tokio::task::yield_now().await;
1072            }
1073        }
1074
1075        tokio::time::timeout(Duration::from_secs(10), handle)
1076            .await
1077            .unwrap()
1078            .unwrap()
1079    }
1080
1081    #[tokio::test]
1082    async fn test_join_simple() {
1083        let a = Watchable::new(1u8);
1084        let b = Watchable::new(1u8);
1085
1086        let mut ab = Join::new([a.watch(), b.watch()].into_iter());
1087
1088        let stream = ab.clone().stream();
1089        let handle = tokio::task::spawn(async move { stream.take(5).collect::<Vec<_>>().await });
1090
1091        // get
1092        assert_eq!(ab.get(), vec![1, 1]);
1093        // set a
1094        a.set(2u8).unwrap();
1095        tokio::task::yield_now().await;
1096        assert_eq!(ab.get(), vec![2, 1]);
1097        // set b
1098        b.set(3u8).unwrap();
1099        tokio::task::yield_now().await;
1100        assert_eq!(ab.get(), vec![2, 3]);
1101
1102        a.set(3u8).unwrap();
1103        tokio::task::yield_now().await;
1104        b.set(4u8).unwrap();
1105        tokio::task::yield_now().await;
1106
1107        let values = tokio::time::timeout(Duration::from_secs(5), handle)
1108            .await
1109            .unwrap()
1110            .unwrap();
1111        assert_eq!(
1112            values,
1113            vec![vec![1, 1], vec![2, 1], vec![2, 3], vec![3, 3], vec![3, 4]]
1114        );
1115    }
1116
1117    #[tokio::test]
1118    async fn test_updated_then_disconnect_then_get() {
1119        let watchable = Watchable::new(10);
1120        let mut watcher = watchable.watch();
1121        assert_eq!(watchable.get(), 10);
1122        watchable.set(42).ok();
1123        assert_eq!(watcher.updated().await.unwrap(), 42);
1124        drop(watchable);
1125        assert_eq!(watcher.get(), 42);
1126    }
1127
1128    #[tokio::test(start_paused = true)]
1129    async fn test_update_wakeup_on_watchable_drop() {
1130        let watchable = Watchable::new(10);
1131        let mut watcher = watchable.watch();
1132
1133        let start = Instant::now();
1134        let (_, result) = tokio::time::timeout(Duration::from_secs(2), async move {
1135            tokio::join!(
1136                async move {
1137                    tokio::time::sleep(Duration::from_secs(1)).await;
1138                    drop(watchable);
1139                },
1140                async move { watcher.updated().await }
1141            )
1142        })
1143        .await
1144        .expect("watcher never updated");
1145        // We should've updated 1s after start, since that's when the watchable was dropped.
1146        // If this is 2s, then the watchable dropping didn't wake up the `Watcher::updated` future.
1147        assert_eq!(start.elapsed(), Duration::from_secs(1));
1148        assert!(result.is_err());
1149    }
1150
1151    #[tokio::test(start_paused = true)]
1152    async fn test_update_wakeup_always_a_change() {
1153        let watchable = Watchable::new(10);
1154        let mut watcher = watchable.watch();
1155
1156        let task = tokio::spawn(async move {
1157            let mut last_value = watcher.get();
1158            let mut values = Vec::new();
1159            while let Ok(value) = watcher.updated().await {
1160                values.push(value);
1161                if last_value == value {
1162                    return Err("value duplicated");
1163                }
1164                last_value = value;
1165            }
1166            Ok(values)
1167        });
1168
1169        // wait for the task to get set up and polled till pending for once
1170        tokio::time::sleep(Duration::from_millis(100)).await;
1171
1172        watchable.set(11).ok();
1173        tokio::time::sleep(Duration::from_millis(100)).await;
1174        let clone = watchable.clone();
1175        drop(clone); // this shouldn't trigger an update
1176        tokio::time::sleep(Duration::from_millis(100)).await;
1177        for i in 1..=10 {
1178            watchable.set(i + 11).ok();
1179            tokio::time::sleep(Duration::from_millis(100)).await;
1180        }
1181        drop(watchable);
1182
1183        let values = task
1184            .await
1185            .expect("task panicked")
1186            .expect("value duplicated");
1187        assert_eq!(values, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]);
1188    }
1189
1190    #[test]
1191    fn test_has_watchers() {
1192        let a = Watchable::new(1u8);
1193        assert!(!a.has_watchers());
1194        let b = a.clone();
1195        assert!(!a.has_watchers());
1196        assert!(!b.has_watchers());
1197
1198        let watcher = a.watch();
1199        assert!(a.has_watchers());
1200        assert!(b.has_watchers());
1201
1202        drop(watcher);
1203
1204        assert!(!a.has_watchers());
1205        assert!(!b.has_watchers());
1206    }
1207
1208    #[tokio::test]
1209    async fn test_three_watchers_basic() {
1210        let watchable = Watchable::new(1u8);
1211
1212        let mut w1 = watchable.watch();
1213        let mut w2 = watchable.watch();
1214        let mut w3 = watchable.watch();
1215
1216        // All see the initial value
1217
1218        assert_eq!(w1.get(), 1);
1219        assert_eq!(w2.get(), 1);
1220        assert_eq!(w3.get(), 1);
1221
1222        // Change  value
1223        watchable.set(42).unwrap();
1224
1225        // All watchers get notified
1226        assert_eq!(w1.updated().await.unwrap(), 42);
1227        assert_eq!(w2.updated().await.unwrap(), 42);
1228        assert_eq!(w3.updated().await.unwrap(), 42);
1229    }
1230
1231    #[tokio::test]
1232    async fn test_three_watchers_skip_intermediate() {
1233        let watchable = Watchable::new(0u8);
1234        let mut watcher = watchable.watch();
1235
1236        watchable.set(1).ok();
1237        watchable.set(2).ok();
1238        watchable.set(3).ok();
1239        watchable.set(4).ok();
1240
1241        let value = watcher.updated().await.unwrap();
1242
1243        assert_eq!(value, 4);
1244    }
1245
1246    #[tokio::test]
1247    async fn test_three_watchers_with_streams() {
1248        let watchable = Watchable::new(10u8);
1249
1250        let mut stream1 = watchable.watch().stream();
1251        let mut stream2 = watchable.watch().stream();
1252        let mut stream3 = watchable.watch().stream_updates_only();
1253
1254        assert_eq!(stream1.next().await.unwrap(), 10);
1255        assert_eq!(stream2.next().await.unwrap(), 10);
1256
1257        // Update the value
1258        watchable.set(20).ok();
1259
1260        // All streams see the update
1261        assert_eq!(stream1.next().await.unwrap(), 20);
1262        assert_eq!(stream2.next().await.unwrap(), 20);
1263        assert_eq!(stream3.next().await.unwrap(), 20);
1264    }
1265
1266    #[tokio::test]
1267    async fn test_three_watchers_independent() {
1268        let watchable = Watchable::new(0u8);
1269
1270        let mut fast_watcher = watchable.watch();
1271        let mut slow_watcher = watchable.watch();
1272        let mut lazy_watcher = watchable.watch();
1273
1274        watchable.set(1).ok();
1275        assert_eq!(fast_watcher.updated().await.unwrap(), 1);
1276
1277        // More updates happen
1278        watchable.set(2).ok();
1279        watchable.set(3).ok();
1280
1281        assert_eq!(slow_watcher.updated().await.unwrap(), 3);
1282        assert_eq!(lazy_watcher.get(), 3);
1283    }
1284
1285    #[tokio::test]
1286    async fn test_combine_three_watchers() {
1287        let a = Watchable::new(1u8);
1288        let b = Watchable::new(2u8);
1289        let c = Watchable::new(3u8);
1290
1291        let mut combined = Triple::new(a.watch(), b.watch(), c.watch());
1292
1293        assert_eq!(combined.get(), (1, 2, 3));
1294
1295        // Update one
1296        b.set(20).ok();
1297
1298        assert_eq!(combined.updated().await.unwrap(), (1, 20, 3));
1299
1300        c.set(30).ok();
1301        assert_eq!(combined.updated().await.unwrap(), (1, 20, 30));
1302    }
1303
1304    #[tokio::test]
1305    async fn test_three_watchers_disconnection() {
1306        let watchable = Watchable::new(5u8);
1307
1308        // All connected
1309        let mut w1 = watchable.watch();
1310        let mut w2 = watchable.watch();
1311        let mut w3 = watchable.watch();
1312
1313        // Drop the watchable
1314        drop(watchable);
1315
1316        // All become disconnected
1317        assert!(!w1.is_connected());
1318        assert!(!w2.is_connected());
1319        assert!(!w3.is_connected());
1320
1321        // Can still get last known value
1322        assert_eq!(w1.get(), 5);
1323        assert_eq!(w2.get(), 5);
1324
1325        // But updates fail
1326        assert!(w3.updated().await.is_err());
1327    }
1328
1329    #[tokio::test]
1330    async fn test_three_watchers_truly_concurrent() {
1331        use tokio::time::sleep;
1332        let watchable = Watchable::new(0u8);
1333
1334        // Spawn three READER tasks
1335        let mut reader_handles = vec![];
1336        for i in 0..3 {
1337            let mut watcher = watchable.watch();
1338            let handle = tokio::spawn(async move {
1339                let mut values = vec![];
1340                // Collect up to 5 updates
1341                for _ in 0..5 {
1342                    if let Ok(value) = watcher.updated().await {
1343                        values.push(value);
1344                    } else {
1345                        break;
1346                    }
1347                }
1348                (i, values)
1349            });
1350            reader_handles.push(handle);
1351        }
1352
1353        // Spawn three WRITER tasks that update concurrently
1354        let mut writer_handles = vec![];
1355        for i in 0..3 {
1356            let watchable_clone = watchable.clone();
1357            let handle = tokio::spawn(async move {
1358                for j in 0..5 {
1359                    let value = (i * 10) + j;
1360                    watchable_clone.set(value).ok();
1361                    sleep(Duration::from_millis(5)).await;
1362                }
1363            });
1364            writer_handles.push(handle);
1365        }
1366
1367        // Wait for writers to finish
1368        for handle in writer_handles {
1369            handle.await.unwrap();
1370        }
1371
1372        // Wait for readers and check results
1373        for handle in reader_handles {
1374            let (task_id, values) = handle.await.unwrap();
1375            println!("Reader {}: saw values {:?}", task_id, values);
1376            assert!(!values.is_empty());
1377        }
1378    }
1379
1380    #[tokio::test]
1381    async fn test_peek() {
1382        let a = Watchable::new(vec![1, 2, 3]);
1383        let mut wa = a.watch();
1384
1385        assert_eq!(wa.get(), vec![1, 2, 3]);
1386        assert_eq!(wa.peek(), &vec![1, 2, 3]);
1387
1388        let mut wa_map = wa.map(|a| a.into_iter().map(|a| a * 2).collect::<Vec<_>>());
1389
1390        assert_eq!(wa_map.get(), vec![2, 4, 6]);
1391        assert_eq!(wa_map.peek(), &vec![2, 4, 6]);
1392
1393        let mut wb = a.watch();
1394
1395        assert_eq!(wb.get(), vec![1, 2, 3]);
1396        assert_eq!(wb.peek(), &vec![1, 2, 3]);
1397
1398        let mut wb_map = wb.map(|a| a.into_iter().map(|a| a * 2).collect::<Vec<_>>());
1399
1400        assert_eq!(wb_map.get(), vec![2, 4, 6]);
1401        assert_eq!(wb_map.peek(), &vec![2, 4, 6]);
1402
1403        let mut w_join = Join::new([wa_map, wb_map].into_iter());
1404
1405        assert_eq!(w_join.get(), vec![vec![2, 4, 6], vec![2, 4, 6]]);
1406        assert_eq!(w_join.peek(), &vec![vec![2, 4, 6], vec![2, 4, 6]]);
1407    }
1408
1409    #[tokio::test]
1410    async fn test_update_updates_peek() {
1411        let value = Watchable::new(42);
1412        let mut watcher = value.watch();
1413
1414        assert_eq!(watcher.peek(), &42);
1415        assert!(!watcher.update());
1416
1417        value.set(50).ok();
1418
1419        assert_eq!(watcher.peek(), &42); // watcher wasn't updated yet
1420        assert!(watcher.update()); // Update returns true, because there was an update
1421        assert_eq!(watcher.peek(), &50);
1422        assert!(!watcher.update());
1423
1424        let mut watcher_map = watcher.clone().map(|v| v * 2);
1425
1426        assert_eq!(watcher_map.peek(), &100);
1427        assert!(!watcher_map.update());
1428
1429        value.set(10).ok();
1430
1431        assert_eq!(watcher_map.peek(), &100);
1432        assert!(watcher_map.update());
1433        assert_eq!(watcher_map.peek(), &20);
1434        assert!(!watcher_map.update());
1435
1436        let value2 = Watchable::new(0);
1437        let mut watcher_join = Join::new([watcher, value2.watch()].into_iter());
1438
1439        assert_eq!(watcher_join.peek(), &vec![10, 0]);
1440        assert!(!watcher_join.update());
1441
1442        value.set(0).ok();
1443        value2.set(1).ok();
1444
1445        assert_eq!(watcher_join.peek(), &vec![10, 0]);
1446        assert!(watcher_join.update());
1447        assert_eq!(watcher_join.peek(), &vec![0, 1]);
1448        assert!(!watcher_join.update());
1449    }
1450
1451    #[tokio::test]
1452    async fn test_get_updates_peek() {
1453        let value = Watchable::new(42);
1454        let mut watcher = value.watch();
1455
1456        assert_eq!(watcher.peek(), &42);
1457        assert!(!watcher.update());
1458
1459        value.set(50).ok();
1460
1461        assert_eq!(watcher.peek(), &42); // watcher wasn't updated yet
1462        assert_eq!(watcher.get(), 50); // Update returns true, because there was an update
1463        assert_eq!(watcher.peek(), &50);
1464        assert!(!watcher.update());
1465
1466        let mut watcher_map = watcher.clone().map(|v| v * 2);
1467
1468        assert_eq!(watcher_map.peek(), &100);
1469        assert!(!watcher_map.update());
1470
1471        value.set(10).ok();
1472
1473        assert_eq!(watcher_map.peek(), &100);
1474        assert_eq!(watcher_map.get(), 20);
1475        assert_eq!(watcher_map.peek(), &20);
1476        assert!(!watcher_map.update());
1477
1478        let value2 = Watchable::new(0);
1479        let mut watcher_join = Join::new([watcher, value2.watch()].into_iter());
1480
1481        assert_eq!(watcher_join.peek(), &vec![10, 0]);
1482        assert!(!watcher_join.update());
1483
1484        value.set(0).ok();
1485        value2.set(1).ok();
1486
1487        assert_eq!(watcher_join.peek(), &vec![10, 0]);
1488        assert_eq!(watcher_join.get(), vec![0, 1]);
1489        assert_eq!(watcher_join.peek(), &vec![0, 1]);
1490        assert!(!watcher_join.update());
1491    }
1492
1493    #[tokio::test]
1494    async fn test_modify_cas() {
1495        let watchable = Watchable::new(0u32);
1496        let mut watcher = watchable.watch();
1497
1498        // Successful CAS: mutates and signals change
1499        let swapped = watchable.modify(|v| {
1500            if *v == 0 {
1501                *v = 1;
1502                true
1503            } else {
1504                false
1505            }
1506        });
1507        assert!(swapped);
1508        assert_eq!(watchable.get(), 1);
1509        assert_eq!(watcher.updated().await.unwrap(), 1);
1510
1511        // Failed CAS: closure leaves value untouched, no notification
1512        let swapped = watchable.modify(|v| {
1513            if *v == 0 {
1514                *v = 2;
1515                true
1516            } else {
1517                false
1518            }
1519        });
1520        assert!(!swapped);
1521        assert_eq!(watchable.get(), 1);
1522        assert!(poll_once(&mut watcher.updated()).await.is_none());
1523
1524        // Closure "writes" but the value is equal: no notification
1525        watchable.modify(|v| *v = 1);
1526        assert_eq!(watchable.get(), 1);
1527        assert!(poll_once(&mut watcher.updated()).await.is_none());
1528
1529        // Return value is forwarded
1530        let prev = watchable.modify(|v| {
1531            let prev = *v;
1532            *v += 10;
1533            prev
1534        });
1535        assert_eq!(prev, 1);
1536        assert_eq!(watcher.updated().await.unwrap(), 11);
1537    }
1538
1539    #[tokio::test]
1540    async fn test_ensure_wakers_bounded() {
1541        use tokio::time::{interval, Duration};
1542        let watchable = Watchable::new(0);
1543        let mut watcher = watchable.watch();
1544        let max_tick = 1000;
1545
1546        let handle = tokio::spawn(async move {
1547            let mut ticker = interval(Duration::from_nanos(1));
1548            let mut tick_no = 0;
1549            loop {
1550                tokio::select! {
1551                    _ = watcher.updated() => {}
1552                    _ = ticker.tick() => {
1553                        // We cancel the other future and start over again
1554                        tick_no += 1;
1555                        if tick_no > max_tick{
1556                            return
1557                        }
1558                    }
1559                }
1560                let num_wakers = watchable.shared.wakers.lock().unwrap().len();
1561                assert_eq!(num_wakers, 1);
1562            }
1563        });
1564
1565        tokio::time::timeout(Duration::from_secs(1), handle)
1566            .await
1567            .unwrap()
1568            .unwrap()
1569    }
1570}