n0_watcher/
lib.rs

1//! Watchable values.
2//!
3//! A [`Watchable`] exists to keep track of a value which may change over time.  It allows
4//! observers to be notified of changes to the value.  The aim is to always be aware of the
5//! **last** value, not to observe *every* value change.
6//!
7//! The reason for this is ergonomics and predictable resource usage: Requiring every
8//! intermediate value to be observable would mean that either the side that sets new values
9//! using [`Watchable::set`] would need to wait for all "receivers" of these intermediate
10//! values to catch up and thus be an async operation, or it would require the receivers
11//! to buffer intermediate values until they've been "received" on the [`Watcher`]s with
12//! an unlimited buffer size and thus potentially unlimited memory growth.
13//!
14//! # Example
15//!
16//! ```
17//! use n0_future::StreamExt;
18//! use n0_watcher::{Watchable, Watcher as _};
19//!
20//! #[tokio::main(flavor = "current_thread", start_paused = true)]
21//! async fn main() {
22//!     let watchable = Watchable::new(None);
23//!
24//!     // A task that waits for the watcher to be initialized to Some(value) before printing it
25//!     let mut watcher = watchable.watch();
26//!     tokio::spawn(async move {
27//!         let initialized_value = watcher.initialized().await;
28//!         println!("initialized: {initialized_value}");
29//!     });
30//!
31//!     // A task that prints every update to the watcher since the initial one:
32//!     let mut updates = watchable.watch().stream_updates_only();
33//!     tokio::spawn(async move {
34//!         while let Some(update) = updates.next().await {
35//!             println!("update: {update:?}");
36//!         }
37//!     });
38//!
39//!     // A task that prints the current value and then every update it can catch,
40//!     // but it also does something else which makes it very slow to pick up new
41//!     // values, so it'll skip some:
42//!     let mut current_and_updates = watchable.watch().stream();
43//!     tokio::spawn(async move {
44//!         while let Some(update) = current_and_updates.next().await {
45//!             println!("update2: {update:?}");
46//!             tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
47//!         }
48//!     });
49//!
50//!     for i in 0..20 {
51//!         println!("Setting watchable to {i}");
52//!         watchable.set(Some(i)).ok();
53//!         tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
54//!     }
55//! }
56//! ```
57//!
58//! # Similar but different
59//!
60//! - `async_channel`: This is a multi-producer, multi-consumer channel implementation.
61//!   Only at most one consumer will receive each "produced" value.
62//!   What we want is to have every "produced" value to be "broadcast" to every receiver.
63//! - `tokio::broadcast`: Also a multi-producer, multi-consumer channel implementation.
64//!   This is very similar to this crate (`tokio::broadcast::Sender` is like [`Watchable`]
65//!   and `tokio::broadcast::Receiver` is like [`Watcher`]), but you can't get the latest
66//!   value without `.await`ing on the receiver, and it'll internally store a queue of
67//!   intermediate values.
68//! - `tokio::watch`: Also a MPSC channel, and unlike `tokio::broadcast` only retains the
69//!   latest value. That module has pretty much the same purpose as this crate, but doesn't
70//!   implement a poll-based method of getting updates and doesn't implement combinators.
71//! - [`std::sync::RwLock`]: (wrapped in an [`std::sync::Arc`]) This allows you access
72//!   to the latest values, but might block while it's being set (but that could be short
73//!   enough not to matter for async rust purposes).
74//!   This doesn't allow you to be notified whenever a new value is written.
75//! - The `watchable` crate: We used to use this crate at n0, but we wanted to experiment
76//!   with different APIs and needed Wasm support.
77#[cfg(not(watcher_loom))]
78use std::sync;
79use std::{
80    collections::VecDeque,
81    future::Future,
82    pin::Pin,
83    sync::{Arc, RwLockReadGuard, Weak},
84    task::{self, ready, Poll, Waker},
85};
86
87#[cfg(watcher_loom)]
88use loom::sync;
89use n0_error::StackError;
90use sync::{Mutex, RwLock};
91
92/// A wrapper around a value that notifies [`Watcher`]s when the value is modified.
93///
94/// Only the most recent value is available to any observer, but the observer is guaranteed
95/// to be notified of the most recent value.
96#[derive(Debug, Default)]
97pub struct Watchable<T> {
98    shared: Arc<Shared<T>>,
99}
100
101impl<T> Clone for Watchable<T> {
102    fn clone(&self) -> Self {
103        Self {
104            shared: self.shared.clone(),
105        }
106    }
107}
108
109/// Abstracts over `Option<T>` and `Vec<T>`
110pub trait Nullable<T> {
111    /// Converts this value into an `Option`.
112    fn into_option(self) -> Option<T>;
113}
114
115impl<T> Nullable<T> for Option<T> {
116    fn into_option(self) -> Option<T> {
117        self
118    }
119}
120
121impl<T> Nullable<T> for Vec<T> {
122    fn into_option(mut self) -> Option<T> {
123        self.pop()
124    }
125}
126
127impl<T: Clone + Eq> Watchable<T> {
128    /// Creates a [`Watchable`] initialized to given value.
129    pub fn new(value: T) -> Self {
130        Self {
131            shared: Arc::new(Shared {
132                state: RwLock::new(State {
133                    value,
134                    epoch: INITIAL_EPOCH,
135                }),
136                watchers: Default::default(),
137            }),
138        }
139    }
140
141    /// Sets a new value.
142    ///
143    /// Returns `Ok(previous_value)` if the value was different from the one set, or
144    /// returns the provided value back as `Err(value)` if the value didn't change.
145    ///
146    /// Watchers are only notified if the value changed.
147    pub fn set(&self, value: T) -> Result<T, T> {
148        // We don't actually write when the value didn't change, but there's unfortunately
149        // no way to upgrade a read guard to a write guard, and locking as read first, then
150        // dropping and locking as write introduces a possible race condition.
151        let mut state = self.shared.state.write().expect("poisoned");
152
153        // Find out if the value changed
154        let changed = state.value != value;
155
156        let ret = if changed {
157            let old = std::mem::replace(&mut state.value, value);
158            state.epoch += 1;
159            Ok(old)
160        } else {
161            Err(value)
162        };
163        drop(state); // No need to write anymore
164
165        // Notify watchers
166        if changed {
167            for watcher in self.shared.watchers.lock().expect("poisoned").drain(..) {
168                watcher.wake();
169            }
170        }
171        ret
172    }
173
174    /// Creates a [`Direct`] [`Watcher`], allowing the value to be observed, but not modified.
175    pub fn watch(&self) -> Direct<T> {
176        Direct {
177            state: self.shared.state().clone(),
178            shared: Some(Arc::downgrade(&self.shared)),
179        }
180    }
181
182    /// Returns the currently stored value.
183    pub fn get(&self) -> T {
184        self.shared.get()
185    }
186
187    /// Returns true when there are any watchers actively listening on changes,
188    /// or false when all watchers have been dropped or none have been created yet.
189    pub fn has_watchers(&self) -> bool {
190        // `Watchable`s will increase the strong count
191        // `Direct`s watchers (which all watchers descend from) will increase the weak count
192        Arc::weak_count(&self.shared) != 0
193    }
194}
195
196impl<T> Drop for Shared<T> {
197    fn drop(&mut self) {
198        let Ok(mut watchers) = self.watchers.lock() else {
199            return; // Poisoned waking?
200        };
201        // Wake all watchers once we drop Shared (this happens when
202        // the last `Watchable` is dropped).
203        // This allows us to notify `NextFut::poll`s and have that
204        // return `Disconnected`.
205        for watcher in watchers.drain(..) {
206            watcher.wake();
207        }
208    }
209}
210
211/// A handle to a value that's represented by one or more underlying [`Watchable`]s.
212///
213/// A [`Watcher`] can get the current value, and will be notified when the value changes.
214/// Only the most recent value is accessible, and if the threads with the underlying [`Watchable`]s
215/// change the value faster than the threads with the [`Watcher`] can keep up with, then
216/// it'll miss in-between values.
217/// When the thread changing the [`Watchable`] pauses updating, the [`Watcher`] will always
218/// end up reporting the most recent state eventually.
219///
220/// Watchers can be modified via [`Watcher::map`] to observe a value derived from the original
221/// value via a function.
222///
223/// Watchers can be combined via [`Watcher::or`] to allow observing multiple values at once and
224/// getting an update in case any of the values updates.
225///
226/// One of the underlying [`Watchable`]s might already be dropped. In that case,
227/// the watcher will be "disconnected" and return [`Err(Disconnected)`](Disconnected)
228/// on some function calls or, when turned into a stream, that stream will end.
229/// This property can also be checked with [`Watcher::is_connected`].
230pub trait Watcher: Clone {
231    /// The type of value that can change.
232    ///
233    /// We require `Clone`, because we need to be able to make
234    /// the values have a lifetime that's detached from the original [`Watchable`]'s
235    /// lifetime.
236    ///
237    /// We require `Eq`, to be able to check whether the value actually changed or
238    /// not, so we can notify or not notify accordingly.
239    type Value: Clone + Eq;
240
241    /// Updates the watcher to the latest value and returns that value.
242    ///
243    /// If any of the underlying [`Watchable`] values have been dropped, then this
244    /// might return an outdated value for that watchable, specifically, the latest
245    /// value that was fetched for that watchable, as opposed to the latest value
246    /// that was set on the watchable before it was dropped.
247    ///
248    /// The default implementation for this is simply
249    /// ```ignore
250    /// fn get(&mut self) -> Self::Value {
251    ///     self.update();
252    ///     self.peek().clone()
253    /// }
254    /// ```
255    fn get(&mut self) -> Self::Value {
256        self.update();
257        self.peek().clone()
258    }
259
260    /// Updates the watcher to the latest value and returns whether it changed.
261    ///
262    /// Watchers keep track of the "latest known" value they fetched.
263    /// This function updates that internal value by looking up the latest value
264    /// at the [`Watchable`]\(s\) that this watcher is linked to.
265    fn update(&mut self) -> bool;
266
267    /// Returns a reference to the value currently stored in the watcher.
268    ///
269    /// Watchers keep track of the "latest known" value they fetched.
270    /// Calling this won't update the latest value, unlike [`Watcher::get`] or
271    /// [`Watcher::update`].
272    ///
273    /// This can be useful if you want to avoid copying out the internal value
274    /// frequently like what [`Watcher::get`] will end up doing.
275    fn peek(&self) -> &Self::Value;
276
277    /// Whether this watcher is still connected to all of its underlying [`Watchable`]s.
278    ///
279    /// Returns false when any of the underlying watchables has been dropped.
280    fn is_connected(&self) -> bool;
281
282    /// Polls for the next value, or returns [`Disconnected`] if one of the underlying
283    /// [`Watchable`]s has been dropped.
284    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>>;
285
286    /// Returns a future completing with `Ok(value)` once a new value is set, or with
287    /// [`Err(Disconnected)`](Disconnected) if the connected [`Watchable`] was dropped.
288    ///
289    /// # Cancel Safety
290    ///
291    /// The returned future is cancel-safe.
292    fn updated(&mut self) -> NextFut<'_, Self> {
293        NextFut { watcher: self }
294    }
295
296    /// Returns a future completing once the value is set to [`Some`] value.
297    ///
298    /// If the current value is [`Some`] value, this future will resolve immediately.
299    ///
300    /// This is a utility for the common case of storing an [`Option`] inside a
301    /// [`Watchable`].
302    ///
303    /// # Cancel Safety
304    ///
305    /// The returned future is cancel-safe.
306    fn initialized<T, W>(&mut self) -> InitializedFut<'_, T, W, Self>
307    where
308        W: Nullable<T> + Clone,
309        Self: Watcher<Value = W>,
310    {
311        InitializedFut {
312            initial: self.get().into_option(),
313            watcher: self,
314        }
315    }
316
317    /// Returns a stream which will yield the most recent values as items.
318    ///
319    /// The first item of the stream is the current value, so that this stream can be easily
320    /// used to operate on the most recent value.
321    ///
322    /// Note however, that only the last item is stored.  If the stream is not polled when an
323    /// item is available it can be replaced with another item by the time it is polled.
324    ///
325    /// This stream ends once the original [`Watchable`] has been dropped.
326    ///
327    /// # Cancel Safety
328    ///
329    /// The returned stream is cancel-safe.
330    fn stream(mut self) -> Stream<Self>
331    where
332        Self: Unpin,
333    {
334        Stream {
335            initial: Some(self.get()),
336            watcher: self,
337        }
338    }
339
340    /// Returns a stream which will yield the most recent values as items, starting from
341    /// the next unobserved future value.
342    ///
343    /// This means this stream will only yield values when the watched value changes,
344    /// the value stored at the time the stream is created is not yielded.
345    ///
346    /// Note however, that only the last item is stored.  If the stream is not polled when an
347    /// item is available it can be replaced with another item by the time it is polled.
348    ///
349    /// This stream ends once the original [`Watchable`] has been dropped.
350    ///
351    /// # Cancel Safety
352    ///
353    /// The returned stream is cancel-safe.
354    fn stream_updates_only(self) -> Stream<Self>
355    where
356        Self: Unpin,
357    {
358        Stream {
359            initial: None,
360            watcher: self,
361        }
362    }
363
364    /// Maps this watcher with a function that transforms the observed values.
365    ///
366    /// The returned watcher will only register updates, when the *mapped* value
367    /// observably changes.
368    fn map<T: Clone + Eq>(
369        mut self,
370        map: impl Fn(Self::Value) -> T + Send + Sync + 'static,
371    ) -> Map<Self, T> {
372        Map {
373            current: (map)(self.get()),
374            map: Arc::new(map),
375            watcher: self,
376        }
377    }
378
379    /// Returns a watcher that updates every time this or the other watcher
380    /// updates, and yields both watcher's items together when that happens.
381    fn or<W: Watcher>(self, other: W) -> Tuple<Self, W> {
382        Tuple::new(self, other)
383    }
384}
385
386/// The immediate, direct observer of a [`Watchable`] value.
387///
388/// This type is mainly used via the [`Watcher`] interface.
389#[derive(Debug, Clone)]
390pub struct Direct<T> {
391    state: State<T>,
392    // We wrap the Weak with an Option, so that we can set it to `None` once we
393    // notice that Weak is not upgradable anymore for the first time.
394    // This allows the weak pointer's allocation to be freed in case this makes
395    // the weak count go to zero (even if Direct is still kept around).
396    shared: Option<Weak<Shared<T>>>,
397}
398
399impl<T: Clone + Eq> Watcher for Direct<T> {
400    type Value = T;
401
402    fn update(&mut self) -> bool {
403        let Some(shared) = self.shared.as_ref().and_then(|weak| weak.upgrade()) else {
404            self.shared = None; // Weak won't be upgradable in the future, this way we can allow the allocation to be freed
405            return false;
406        };
407        let state = shared.state();
408        if state.epoch > self.state.epoch {
409            self.state = state.clone();
410            true
411        } else {
412            false
413        }
414    }
415
416    fn peek(&self) -> &Self::Value {
417        &self.state.value
418    }
419
420    fn is_connected(&self) -> bool {
421        self.shared
422            .as_ref()
423            .and_then(|weak| weak.upgrade())
424            .is_some()
425    }
426
427    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
428        let Some(shared) = self.shared.as_ref().and_then(|weak| weak.upgrade()) else {
429            self.shared = None; // Weak won't be upgradable in the future, this way we can allow the allocation to be freed
430            return Poll::Ready(Err(Disconnected));
431        };
432        self.state = ready!(shared.poll_updated(cx, self.state.epoch));
433        Poll::Ready(Ok(()))
434    }
435}
436
437#[derive(Debug, Clone)]
438pub struct Tuple<S: Watcher, T: Watcher> {
439    inner: (S, T),
440    current: (S::Value, T::Value),
441}
442
443impl<S: Watcher, T: Watcher> Tuple<S, T> {
444    pub fn new(mut s: S, mut t: T) -> Self {
445        let current = (s.get(), t.get());
446        Self {
447            inner: (s, t),
448            current,
449        }
450    }
451}
452
453impl<S: Watcher, T: Watcher> Watcher for Tuple<S, T> {
454    type Value = (S::Value, T::Value);
455
456    fn update(&mut self) -> bool {
457        // We need to update all watchers! So don't early-return
458        let s_updated = self.inner.0.update();
459        let t_updated = self.inner.1.update();
460        let updated = s_updated || t_updated;
461        if updated {
462            self.current = (self.inner.0.peek().clone(), self.inner.1.peek().clone());
463        }
464        updated
465    }
466
467    fn peek(&self) -> &Self::Value {
468        &self.current
469    }
470
471    fn is_connected(&self) -> bool {
472        self.inner.0.is_connected() && self.inner.1.is_connected()
473    }
474
475    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
476        let poll_0 = self.inner.0.poll_updated(cx)?;
477        let poll_1 = self.inner.1.poll_updated(cx)?;
478        if poll_0.is_pending() && poll_1.is_pending() {
479            return Poll::Pending;
480        }
481        if poll_0.is_ready() {
482            self.current.0 = self.inner.0.peek().clone();
483        }
484        if poll_1.is_ready() {
485            self.current.1 = self.inner.1.peek().clone();
486        }
487        Poll::Ready(Ok(()))
488    }
489}
490
491#[derive(Debug, Clone)]
492pub struct Triple<S: Watcher, T: Watcher, U: Watcher> {
493    inner: (S, T, U),
494    current: (S::Value, T::Value, U::Value),
495}
496
497impl<S: Watcher, T: Watcher, U: Watcher> Triple<S, T, U> {
498    pub fn new(mut s: S, mut t: T, mut u: U) -> Self {
499        let current = (s.get(), t.get(), u.get());
500        Self {
501            inner: (s, t, u),
502            current,
503        }
504    }
505}
506
507impl<S: Watcher, T: Watcher, U: Watcher> Watcher for Triple<S, T, U> {
508    type Value = (S::Value, T::Value, U::Value);
509
510    fn update(&mut self) -> bool {
511        // We need to update all watchers! So don't early-return
512        let s_updated = self.inner.0.update();
513        let t_updated = self.inner.1.update();
514        let u_updated = self.inner.2.update();
515        let updated = s_updated || t_updated || u_updated;
516        if updated {
517            self.current = (
518                self.inner.0.peek().clone(),
519                self.inner.1.peek().clone(),
520                self.inner.2.peek().clone(),
521            );
522        }
523        updated
524    }
525
526    fn peek(&self) -> &Self::Value {
527        &self.current
528    }
529
530    fn is_connected(&self) -> bool {
531        self.inner.0.is_connected() && self.inner.1.is_connected() && self.inner.2.is_connected()
532    }
533
534    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
535        let poll_0 = self.inner.0.poll_updated(cx)?;
536        let poll_1 = self.inner.1.poll_updated(cx)?;
537        let poll_2 = self.inner.2.poll_updated(cx)?;
538
539        if poll_0.is_pending() && poll_1.is_pending() && poll_2.is_pending() {
540            return Poll::Pending;
541        }
542        if poll_0.is_ready() {
543            self.current.0 = self.inner.0.peek().clone();
544        }
545        if poll_1.is_ready() {
546            self.current.1 = self.inner.1.peek().clone();
547        }
548        if poll_2.is_ready() {
549            self.current.2 = self.inner.2.peek().clone();
550        }
551        Poll::Ready(Ok(()))
552    }
553}
554
555/// Combinator to join two watchers
556#[derive(Debug, Clone)]
557pub struct Join<T: Clone + Eq, W: Watcher<Value = T>> {
558    // invariant: watchers.len() == current.len()
559    watchers: Vec<W>,
560    current: Vec<T>,
561}
562
563impl<T: Clone + Eq, W: Watcher<Value = T>> Join<T, W> {
564    /// Joins a set of watchers into a single watcher
565    pub fn new(watchers: impl Iterator<Item = W>) -> Self {
566        let mut watchers: Vec<W> = watchers.into_iter().collect();
567
568        let mut current = Vec::with_capacity(watchers.len());
569        for watcher in &mut watchers {
570            current.push(watcher.get());
571        }
572        Self { watchers, current }
573    }
574}
575
576impl<T: Clone + Eq, W: Watcher<Value = T>> Watcher for Join<T, W> {
577    type Value = Vec<T>;
578
579    fn update(&mut self) -> bool {
580        let mut any_updated = false;
581        for (value, watcher) in self.current.iter_mut().zip(self.watchers.iter_mut()) {
582            if watcher.update() {
583                any_updated = true;
584                *value = watcher.peek().clone();
585            }
586        }
587        any_updated
588    }
589
590    fn peek(&self) -> &Self::Value {
591        &self.current
592    }
593
594    fn is_connected(&self) -> bool {
595        self.watchers.iter().all(|w| w.is_connected())
596    }
597
598    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
599        let mut any_updated = false;
600        for (value, watcher) in self.current.iter_mut().zip(self.watchers.iter_mut()) {
601            if watcher.poll_updated(cx)?.is_ready() {
602                any_updated = true;
603                *value = watcher.peek().clone();
604            }
605        }
606
607        if any_updated {
608            Poll::Ready(Ok(()))
609        } else {
610            Poll::Pending
611        }
612    }
613}
614
615/// Wraps a [`Watcher`] to allow observing a derived value.
616///
617/// See [`Watcher::map`].
618#[derive(derive_more::Debug, Clone)]
619pub struct Map<W: Watcher, T: Clone + Eq> {
620    #[debug("Arc<dyn Fn(W::Value) -> T>")]
621    map: Arc<dyn Fn(W::Value) -> T + Send + Sync + 'static>,
622    watcher: W,
623    current: T,
624}
625
626impl<W: Watcher, T: Clone + Eq> Watcher for Map<W, T> {
627    type Value = T;
628
629    fn update(&mut self) -> bool {
630        if self.watcher.update() {
631            let new = (self.map)(self.watcher.peek().clone());
632            if new != self.current {
633                self.current = new;
634                true
635            } else {
636                false
637            }
638        } else {
639            false
640        }
641    }
642
643    fn peek(&self) -> &Self::Value {
644        &self.current
645    }
646
647    fn is_connected(&self) -> bool {
648        self.watcher.is_connected()
649    }
650
651    fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
652        loop {
653            ready!(self.watcher.poll_updated(cx)?);
654            let new = (self.map)(self.watcher.peek().clone());
655            if new != self.current {
656                self.current = new;
657                return Poll::Ready(Ok(()));
658            }
659        }
660    }
661}
662
663/// Future returning the next item after the current one in a [`Watcher`].
664///
665/// See [`Watcher::updated`].
666///
667/// # Cancel Safety
668///
669/// This future is cancel-safe.
670#[derive(Debug)]
671pub struct NextFut<'a, W: Watcher> {
672    watcher: &'a mut W,
673}
674
675impl<W: Watcher> Future for NextFut<'_, W> {
676    type Output = Result<W::Value, Disconnected>;
677
678    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
679        ready!(self.watcher.poll_updated(cx))?;
680        Poll::Ready(Ok(self.watcher.peek().clone()))
681    }
682}
683
684/// Future returning the current or next value that's [`Some`] value.
685/// in a [`Watcher`].
686///
687/// See [`Watcher::initialized`].
688///
689/// # Cancel Safety
690///
691/// This Future is cancel-safe.
692#[derive(Debug)]
693pub struct InitializedFut<'a, T, V: Nullable<T> + Clone, W: Watcher<Value = V>> {
694    initial: Option<T>,
695    watcher: &'a mut W,
696}
697
698impl<T: Clone + Eq + Unpin, V: Nullable<T> + Clone, W: Watcher<Value = V> + Unpin> Future
699    for InitializedFut<'_, T, V, W>
700{
701    type Output = T;
702
703    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
704        let mut this = self.as_mut();
705        if let Some(value) = this.initial.take() {
706            return Poll::Ready(value);
707        }
708        loop {
709            if ready!(this.watcher.poll_updated(cx)).is_err() {
710                // The value will never be initialized
711                return Poll::Pending;
712            };
713            let value = this.watcher.peek();
714            if let Some(value) = value.clone().into_option() {
715                return Poll::Ready(value);
716            }
717        }
718    }
719}
720
721/// A stream for a [`Watcher`]'s next values.
722///
723/// See [`Watcher::stream`] and [`Watcher::stream_updates_only`].
724///
725/// # Cancel Safety
726///
727/// This stream is cancel-safe.
728#[derive(Debug, Clone)]
729pub struct Stream<W: Watcher + Unpin> {
730    initial: Option<W::Value>,
731    watcher: W,
732}
733
734impl<W: Watcher + Unpin> n0_future::Stream for Stream<W>
735where
736    W::Value: Unpin,
737{
738    type Item = W::Value;
739
740    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
741        if let Some(value) = self.as_mut().initial.take() {
742            return Poll::Ready(Some(value));
743        }
744        match self.as_mut().watcher.poll_updated(cx) {
745            Poll::Ready(Ok(())) => Poll::Ready(Some(self.as_ref().watcher.peek().clone())),
746            Poll::Ready(Err(Disconnected)) => Poll::Ready(None),
747            Poll::Pending => Poll::Pending,
748        }
749    }
750}
751
752/// The error for when a [`Watcher`] is disconnected from its underlying
753/// [`Watchable`] value, because of that watchable having been dropped.
754#[derive(StackError)]
755#[error("Watcher lost connection to underlying Watchable, it was dropped")]
756pub struct Disconnected;
757
758// Private:
759
760const INITIAL_EPOCH: u64 = 1;
761
762/// The shared state for a [`Watchable`].
763#[derive(Debug, Default)]
764struct Shared<T> {
765    /// The value to be watched and its current epoch.
766    state: RwLock<State<T>>,
767    watchers: Mutex<VecDeque<Waker>>,
768}
769
770#[derive(Debug, Clone)]
771struct State<T> {
772    value: T,
773    epoch: u64,
774}
775
776impl<T: Default> Default for State<T> {
777    fn default() -> Self {
778        Self {
779            value: Default::default(),
780            epoch: INITIAL_EPOCH,
781        }
782    }
783}
784
785impl<T: Clone> Shared<T> {
786    fn get(&self) -> T {
787        self.state.read().expect("poisoned").value.clone()
788    }
789
790    fn state(&self) -> RwLockReadGuard<'_, State<T>> {
791        self.state.read().expect("poisoned")
792    }
793
794    fn poll_updated(&self, cx: &mut task::Context<'_>, last_epoch: u64) -> Poll<State<T>> {
795        {
796            let state = self.state();
797
798            // We might get spurious wakeups due to e.g. a second-to-last Watchable being dropped.
799            // This makes sure we don't accidentally return an update that's not actually an update.
800            if last_epoch < state.epoch {
801                return Poll::Ready(state.clone());
802            }
803        }
804
805        self.watchers
806            .lock()
807            .expect("poisoned")
808            .push_back(cx.waker().to_owned());
809
810        #[cfg(watcher_loom)]
811        loom::thread::yield_now();
812
813        // We check for an update again to prevent races between putting in wakers and looking for updates.
814        {
815            let state = self.state();
816
817            if last_epoch < state.epoch {
818                return Poll::Ready(state.clone());
819            }
820        }
821
822        Poll::Pending
823    }
824}
825
826#[cfg(test)]
827mod tests {
828
829    use n0_future::{future::poll_once, StreamExt};
830    use rand::{rng, Rng};
831    use tokio::{
832        task::JoinSet,
833        time::{Duration, Instant},
834    };
835    use tokio_util::sync::CancellationToken;
836
837    use super::*;
838
839    #[tokio::test]
840    async fn test_watcher() {
841        let cancel = CancellationToken::new();
842        let watchable = Watchable::new(17);
843
844        assert_eq!(watchable.watch().stream().next().await.unwrap(), 17);
845
846        let start = Instant::now();
847        // spawn watchers
848        let mut tasks = JoinSet::new();
849        for i in 0..3 {
850            let mut watch = watchable.watch().stream();
851            let cancel = cancel.clone();
852            tasks.spawn(async move {
853                println!("[{i}] spawn");
854                let mut expected_value = 17;
855                loop {
856                    tokio::select! {
857                        biased;
858                        Some(value) = &mut watch.next() => {
859                            println!("{:?} [{i}] update: {value}", start.elapsed());
860                            assert_eq!(value, expected_value);
861                            if expected_value == 17 {
862                                expected_value = 0;
863                            } else {
864                                expected_value += 1;
865                            }
866                        },
867                        _ = cancel.cancelled() => {
868                            println!("{:?} [{i}] cancel", start.elapsed());
869                            assert_eq!(expected_value, 10);
870                            break;
871                        }
872                    }
873                }
874            });
875        }
876        for i in 0..3 {
877            let mut watch = watchable.watch().stream_updates_only();
878            let cancel = cancel.clone();
879            tasks.spawn(async move {
880                println!("[{i}] spawn");
881                let mut expected_value = 0;
882                loop {
883                    tokio::select! {
884                        biased;
885                        Some(value) = watch.next() => {
886                            println!("{:?} [{i}] stream update: {value}", start.elapsed());
887                            assert_eq!(value, expected_value);
888                            expected_value += 1;
889                        },
890                        _ = cancel.cancelled() => {
891                            println!("{:?} [{i}] cancel", start.elapsed());
892                            assert_eq!(expected_value, 10);
893                            break;
894                        }
895                        else => {
896                            panic!("stream died");
897                        }
898                    }
899                }
900            });
901        }
902
903        // set value
904        for next_value in 0..10 {
905            let sleep = Duration::from_nanos(rng().random_range(0..100_000_000));
906            println!("{:?} sleep {sleep:?}", start.elapsed());
907            tokio::time::sleep(sleep).await;
908
909            let changed = watchable.set(next_value);
910            println!("{:?} set {next_value} changed={changed:?}", start.elapsed());
911        }
912
913        println!("cancel");
914        cancel.cancel();
915        while let Some(res) = tasks.join_next().await {
916            res.expect("task failed");
917        }
918    }
919
920    #[test]
921    fn test_get() {
922        let watchable = Watchable::new(None);
923        assert!(watchable.get().is_none());
924
925        watchable.set(Some(1u8)).ok();
926        assert_eq!(watchable.get(), Some(1u8));
927    }
928
929    #[tokio::test]
930    async fn test_initialize() {
931        let watchable = Watchable::new(None);
932
933        let mut watcher = watchable.watch();
934        let mut initialized = watcher.initialized();
935
936        let poll = poll_once(&mut initialized).await;
937        assert!(poll.is_none());
938
939        watchable.set(Some(1u8)).ok();
940
941        let poll = poll_once(&mut initialized).await;
942        assert_eq!(poll.unwrap(), 1u8);
943    }
944
945    #[tokio::test]
946    async fn test_initialize_already_init() {
947        let watchable = Watchable::new(Some(1u8));
948
949        let mut watcher = watchable.watch();
950        let mut initialized = watcher.initialized();
951
952        let poll = poll_once(&mut initialized).await;
953        assert_eq!(poll.unwrap(), 1u8);
954    }
955
956    #[test]
957    fn test_initialized_always_resolves() {
958        #[cfg(not(watcher_loom))]
959        use std::thread;
960
961        #[cfg(watcher_loom)]
962        use loom::thread;
963
964        let test_case = || {
965            let watchable = Watchable::<Option<u8>>::new(None);
966
967            let mut watch = watchable.watch();
968            let thread = thread::spawn(move || n0_future::future::block_on(watch.initialized()));
969
970            watchable.set(Some(42)).ok();
971
972            thread::yield_now();
973
974            let value: u8 = thread.join().unwrap();
975
976            assert_eq!(value, 42);
977        };
978
979        #[cfg(watcher_loom)]
980        loom::model(test_case);
981        #[cfg(not(watcher_loom))]
982        test_case();
983    }
984
985    #[tokio::test(flavor = "multi_thread")]
986    async fn test_update_cancel_safety() {
987        let watchable = Watchable::new(0);
988        let mut watch = watchable.watch();
989        const MAX: usize = 100_000;
990
991        let handle = tokio::spawn(async move {
992            let mut last_observed = 0;
993
994            while last_observed != MAX {
995                tokio::select! {
996                    val = watch.updated() => {
997                        let Ok(val) = val else {
998                            return;
999                        };
1000
1001                        assert_ne!(val, last_observed, "never observe the same value twice, even with cancellation");
1002                        last_observed = val;
1003                    }
1004                    _ = tokio::time::sleep(Duration::from_micros(rng().random_range(0..10_000))) => {
1005                        // We cancel the other future and start over again
1006                        continue;
1007                    }
1008                }
1009            }
1010        });
1011
1012        for i in 1..=MAX {
1013            watchable.set(i).ok();
1014            if rng().random_bool(0.2) {
1015                tokio::task::yield_now().await;
1016            }
1017        }
1018
1019        tokio::time::timeout(Duration::from_secs(10), handle)
1020            .await
1021            .unwrap()
1022            .unwrap()
1023    }
1024
1025    #[tokio::test]
1026    async fn test_join_simple() {
1027        let a = Watchable::new(1u8);
1028        let b = Watchable::new(1u8);
1029
1030        let mut ab = Join::new([a.watch(), b.watch()].into_iter());
1031
1032        let stream = ab.clone().stream();
1033        let handle = tokio::task::spawn(async move { stream.take(5).collect::<Vec<_>>().await });
1034
1035        // get
1036        assert_eq!(ab.get(), vec![1, 1]);
1037        // set a
1038        a.set(2u8).unwrap();
1039        tokio::task::yield_now().await;
1040        assert_eq!(ab.get(), vec![2, 1]);
1041        // set b
1042        b.set(3u8).unwrap();
1043        tokio::task::yield_now().await;
1044        assert_eq!(ab.get(), vec![2, 3]);
1045
1046        a.set(3u8).unwrap();
1047        tokio::task::yield_now().await;
1048        b.set(4u8).unwrap();
1049        tokio::task::yield_now().await;
1050
1051        let values = tokio::time::timeout(Duration::from_secs(5), handle)
1052            .await
1053            .unwrap()
1054            .unwrap();
1055        assert_eq!(
1056            values,
1057            vec![vec![1, 1], vec![2, 1], vec![2, 3], vec![3, 3], vec![3, 4]]
1058        );
1059    }
1060
1061    #[tokio::test]
1062    async fn test_updated_then_disconnect_then_get() {
1063        let watchable = Watchable::new(10);
1064        let mut watcher = watchable.watch();
1065        assert_eq!(watchable.get(), 10);
1066        watchable.set(42).ok();
1067        assert_eq!(watcher.updated().await.unwrap(), 42);
1068        drop(watchable);
1069        assert_eq!(watcher.get(), 42);
1070    }
1071
1072    #[tokio::test(start_paused = true)]
1073    async fn test_update_wakeup_on_watchable_drop() {
1074        let watchable = Watchable::new(10);
1075        let mut watcher = watchable.watch();
1076
1077        let start = Instant::now();
1078        let (_, result) = tokio::time::timeout(Duration::from_secs(2), async move {
1079            tokio::join!(
1080                async move {
1081                    tokio::time::sleep(Duration::from_secs(1)).await;
1082                    drop(watchable);
1083                },
1084                async move { watcher.updated().await }
1085            )
1086        })
1087        .await
1088        .expect("watcher never updated");
1089        // We should've updated 1s after start, since that's when the watchable was dropped.
1090        // If this is 2s, then the watchable dropping didn't wake up the `Watcher::updated` future.
1091        assert_eq!(start.elapsed(), Duration::from_secs(1));
1092        assert!(result.is_err());
1093    }
1094
1095    #[tokio::test(start_paused = true)]
1096    async fn test_update_wakeup_always_a_change() {
1097        let watchable = Watchable::new(10);
1098        let mut watcher = watchable.watch();
1099
1100        let task = tokio::spawn(async move {
1101            let mut last_value = watcher.get();
1102            let mut values = Vec::new();
1103            while let Ok(value) = watcher.updated().await {
1104                values.push(value);
1105                if last_value == value {
1106                    return Err("value duplicated");
1107                }
1108                last_value = value;
1109            }
1110            Ok(values)
1111        });
1112
1113        // wait for the task to get set up and polled till pending for once
1114        tokio::time::sleep(Duration::from_millis(100)).await;
1115
1116        watchable.set(11).ok();
1117        tokio::time::sleep(Duration::from_millis(100)).await;
1118        let clone = watchable.clone();
1119        drop(clone); // this shouldn't trigger an update
1120        tokio::time::sleep(Duration::from_millis(100)).await;
1121        for i in 1..=10 {
1122            watchable.set(i + 11).ok();
1123            tokio::time::sleep(Duration::from_millis(100)).await;
1124        }
1125        drop(watchable);
1126
1127        let values = task
1128            .await
1129            .expect("task panicked")
1130            .expect("value duplicated");
1131        assert_eq!(values, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]);
1132    }
1133
1134    #[test]
1135    fn test_has_watchers() {
1136        let a = Watchable::new(1u8);
1137        assert!(!a.has_watchers());
1138        let b = a.clone();
1139        assert!(!a.has_watchers());
1140        assert!(!b.has_watchers());
1141
1142        let watcher = a.watch();
1143        assert!(a.has_watchers());
1144        assert!(b.has_watchers());
1145
1146        drop(watcher);
1147
1148        assert!(!a.has_watchers());
1149        assert!(!b.has_watchers());
1150    }
1151
1152    #[tokio::test]
1153    async fn test_three_watchers_basic() {
1154        let watchable = Watchable::new(1u8);
1155
1156        let mut w1 = watchable.watch();
1157        let mut w2 = watchable.watch();
1158        let mut w3 = watchable.watch();
1159
1160        // All see the initial value
1161
1162        assert_eq!(w1.get(), 1);
1163        assert_eq!(w2.get(), 1);
1164        assert_eq!(w3.get(), 1);
1165
1166        // Change  value
1167        watchable.set(42).unwrap();
1168
1169        // All watchers get notified
1170        assert_eq!(w1.updated().await.unwrap(), 42);
1171        assert_eq!(w2.updated().await.unwrap(), 42);
1172        assert_eq!(w3.updated().await.unwrap(), 42);
1173    }
1174
1175    #[tokio::test]
1176    async fn test_three_watchers_skip_intermediate() {
1177        let watchable = Watchable::new(0u8);
1178        let mut watcher = watchable.watch();
1179
1180        watchable.set(1).ok();
1181        watchable.set(2).ok();
1182        watchable.set(3).ok();
1183        watchable.set(4).ok();
1184
1185        let value = watcher.updated().await.unwrap();
1186
1187        assert_eq!(value, 4);
1188    }
1189
1190    #[tokio::test]
1191    async fn test_three_watchers_with_streams() {
1192        let watchable = Watchable::new(10u8);
1193
1194        let mut stream1 = watchable.watch().stream();
1195        let mut stream2 = watchable.watch().stream();
1196        let mut stream3 = watchable.watch().stream_updates_only();
1197
1198        assert_eq!(stream1.next().await.unwrap(), 10);
1199        assert_eq!(stream2.next().await.unwrap(), 10);
1200
1201        // Update the value
1202        watchable.set(20).ok();
1203
1204        // All streams see the update
1205        assert_eq!(stream1.next().await.unwrap(), 20);
1206        assert_eq!(stream2.next().await.unwrap(), 20);
1207        assert_eq!(stream3.next().await.unwrap(), 20);
1208    }
1209
1210    #[tokio::test]
1211    async fn test_three_watchers_independent() {
1212        let watchable = Watchable::new(0u8);
1213
1214        let mut fast_watcher = watchable.watch();
1215        let mut slow_watcher = watchable.watch();
1216        let mut lazy_watcher = watchable.watch();
1217
1218        watchable.set(1).ok();
1219        assert_eq!(fast_watcher.updated().await.unwrap(), 1);
1220
1221        // More updates happen
1222        watchable.set(2).ok();
1223        watchable.set(3).ok();
1224
1225        assert_eq!(slow_watcher.updated().await.unwrap(), 3);
1226        assert_eq!(lazy_watcher.get(), 3);
1227    }
1228
1229    #[tokio::test]
1230    async fn test_combine_three_watchers() {
1231        let a = Watchable::new(1u8);
1232        let b = Watchable::new(2u8);
1233        let c = Watchable::new(3u8);
1234
1235        let mut combined = Triple::new(a.watch(), b.watch(), c.watch());
1236
1237        assert_eq!(combined.get(), (1, 2, 3));
1238
1239        // Update one
1240        b.set(20).ok();
1241
1242        assert_eq!(combined.updated().await.unwrap(), (1, 20, 3));
1243
1244        c.set(30).ok();
1245        assert_eq!(combined.updated().await.unwrap(), (1, 20, 30));
1246    }
1247
1248    #[tokio::test]
1249    async fn test_three_watchers_disconnection() {
1250        let watchable = Watchable::new(5u8);
1251
1252        // All connected
1253        let mut w1 = watchable.watch();
1254        let mut w2 = watchable.watch();
1255        let mut w3 = watchable.watch();
1256
1257        // Drop the watchable
1258        drop(watchable);
1259
1260        // All become disconnected
1261        assert!(!w1.is_connected());
1262        assert!(!w2.is_connected());
1263        assert!(!w3.is_connected());
1264
1265        // Can still get last known value
1266        assert_eq!(w1.get(), 5);
1267        assert_eq!(w2.get(), 5);
1268
1269        // But updates fail
1270        assert!(w3.updated().await.is_err());
1271    }
1272
1273    #[tokio::test]
1274    async fn test_three_watchers_truly_concurrent() {
1275        use tokio::time::sleep;
1276        let watchable = Watchable::new(0u8);
1277
1278        // Spawn three READER tasks
1279        let mut reader_handles = vec![];
1280        for i in 0..3 {
1281            let mut watcher = watchable.watch();
1282            let handle = tokio::spawn(async move {
1283                let mut values = vec![];
1284                // Collect up to 5 updates
1285                for _ in 0..5 {
1286                    if let Ok(value) = watcher.updated().await {
1287                        values.push(value);
1288                    } else {
1289                        break;
1290                    }
1291                }
1292                (i, values)
1293            });
1294            reader_handles.push(handle);
1295        }
1296
1297        // Spawn three WRITER tasks that update concurrently
1298        let mut writer_handles = vec![];
1299        for i in 0..3 {
1300            let watchable_clone = watchable.clone();
1301            let handle = tokio::spawn(async move {
1302                for j in 0..5 {
1303                    let value = (i * 10) + j;
1304                    watchable_clone.set(value).ok();
1305                    sleep(Duration::from_millis(5)).await;
1306                }
1307            });
1308            writer_handles.push(handle);
1309        }
1310
1311        // Wait for writers to finish
1312        for handle in writer_handles {
1313            handle.await.unwrap();
1314        }
1315
1316        // Wait for readers and check results
1317        for handle in reader_handles {
1318            let (task_id, values) = handle.await.unwrap();
1319            println!("Reader {}: saw values {:?}", task_id, values);
1320            assert!(!values.is_empty());
1321        }
1322    }
1323
1324    #[tokio::test]
1325    async fn test_peek() {
1326        let a = Watchable::new(vec![1, 2, 3]);
1327        let mut wa = a.watch();
1328
1329        assert_eq!(wa.get(), vec![1, 2, 3]);
1330        assert_eq!(wa.peek(), &vec![1, 2, 3]);
1331
1332        let mut wa_map = wa.map(|a| a.into_iter().map(|a| a * 2).collect::<Vec<_>>());
1333
1334        assert_eq!(wa_map.get(), vec![2, 4, 6]);
1335        assert_eq!(wa_map.peek(), &vec![2, 4, 6]);
1336
1337        let mut wb = a.watch();
1338
1339        assert_eq!(wb.get(), vec![1, 2, 3]);
1340        assert_eq!(wb.peek(), &vec![1, 2, 3]);
1341
1342        let mut wb_map = wb.map(|a| a.into_iter().map(|a| a * 2).collect::<Vec<_>>());
1343
1344        assert_eq!(wb_map.get(), vec![2, 4, 6]);
1345        assert_eq!(wb_map.peek(), &vec![2, 4, 6]);
1346
1347        let mut w_join = Join::new([wa_map, wb_map].into_iter());
1348
1349        assert_eq!(w_join.get(), vec![vec![2, 4, 6], vec![2, 4, 6]]);
1350        assert_eq!(w_join.peek(), &vec![vec![2, 4, 6], vec![2, 4, 6]]);
1351    }
1352
1353    #[tokio::test]
1354    async fn test_update_updates_peek() {
1355        let value = Watchable::new(42);
1356        let mut watcher = value.watch();
1357
1358        assert_eq!(watcher.peek(), &42);
1359        assert!(!watcher.update());
1360
1361        value.set(50).ok();
1362
1363        assert_eq!(watcher.peek(), &42); // watcher wasn't updated yet
1364        assert!(watcher.update()); // Update returns true, because there was an update
1365        assert_eq!(watcher.peek(), &50);
1366        assert!(!watcher.update());
1367
1368        let mut watcher_map = watcher.clone().map(|v| v * 2);
1369
1370        assert_eq!(watcher_map.peek(), &100);
1371        assert!(!watcher_map.update());
1372
1373        value.set(10).ok();
1374
1375        assert_eq!(watcher_map.peek(), &100);
1376        assert!(watcher_map.update());
1377        assert_eq!(watcher_map.peek(), &20);
1378        assert!(!watcher_map.update());
1379
1380        let value2 = Watchable::new(0);
1381        let mut watcher_join = Join::new([watcher, value2.watch()].into_iter());
1382
1383        assert_eq!(watcher_join.peek(), &vec![10, 0]);
1384        assert!(!watcher_join.update());
1385
1386        value.set(0).ok();
1387        value2.set(1).ok();
1388
1389        assert_eq!(watcher_join.peek(), &vec![10, 0]);
1390        assert!(watcher_join.update());
1391        assert_eq!(watcher_join.peek(), &vec![0, 1]);
1392        assert!(!watcher_join.update());
1393    }
1394
1395    #[tokio::test]
1396    async fn test_get_updates_peek() {
1397        let value = Watchable::new(42);
1398        let mut watcher = value.watch();
1399
1400        assert_eq!(watcher.peek(), &42);
1401        assert!(!watcher.update());
1402
1403        value.set(50).ok();
1404
1405        assert_eq!(watcher.peek(), &42); // watcher wasn't updated yet
1406        assert_eq!(watcher.get(), 50); // Update returns true, because there was an update
1407        assert_eq!(watcher.peek(), &50);
1408        assert!(!watcher.update());
1409
1410        let mut watcher_map = watcher.clone().map(|v| v * 2);
1411
1412        assert_eq!(watcher_map.peek(), &100);
1413        assert!(!watcher_map.update());
1414
1415        value.set(10).ok();
1416
1417        assert_eq!(watcher_map.peek(), &100);
1418        assert_eq!(watcher_map.get(), 20);
1419        assert_eq!(watcher_map.peek(), &20);
1420        assert!(!watcher_map.update());
1421
1422        let value2 = Watchable::new(0);
1423        let mut watcher_join = Join::new([watcher, value2.watch()].into_iter());
1424
1425        assert_eq!(watcher_join.peek(), &vec![10, 0]);
1426        assert!(!watcher_join.update());
1427
1428        value.set(0).ok();
1429        value2.set(1).ok();
1430
1431        assert_eq!(watcher_join.peek(), &vec![10, 0]);
1432        assert_eq!(watcher_join.get(), vec![0, 1]);
1433        assert_eq!(watcher_join.peek(), &vec![0, 1]);
1434        assert!(!watcher_join.update());
1435    }
1436}