1#[cfg(not(watcher_loom))]
78use std::sync;
79use std::{
80 collections::VecDeque,
81 future::Future,
82 pin::Pin,
83 sync::{Arc, RwLockReadGuard, Weak},
84 task::{self, ready, Poll, Waker},
85};
86
87#[cfg(watcher_loom)]
88use loom::sync;
89use n0_error::StackError;
90use sync::{Mutex, RwLock};
91
92#[derive(Debug, Default)]
97pub struct Watchable<T> {
98 shared: Arc<Shared<T>>,
99}
100
101impl<T> Clone for Watchable<T> {
102 fn clone(&self) -> Self {
103 Self {
104 shared: self.shared.clone(),
105 }
106 }
107}
108
109pub trait Nullable<T> {
111 fn into_option(self) -> Option<T>;
113}
114
115impl<T> Nullable<T> for Option<T> {
116 fn into_option(self) -> Option<T> {
117 self
118 }
119}
120
121impl<T> Nullable<T> for Vec<T> {
122 fn into_option(mut self) -> Option<T> {
123 self.pop()
124 }
125}
126
127impl<T: Clone + Eq> Watchable<T> {
128 pub fn new(value: T) -> Self {
130 Self {
131 shared: Arc::new(Shared {
132 state: RwLock::new(State {
133 value,
134 epoch: INITIAL_EPOCH,
135 }),
136 wakers: Default::default(),
137 }),
138 }
139 }
140
141 pub fn set(&self, value: T) -> Result<T, T> {
148 let mut state = self.shared.state.write().expect("poisoned");
152
153 let changed = state.value != value;
155
156 let ret = if changed {
157 let old = std::mem::replace(&mut state.value, value);
158 state.epoch += 1;
159 Ok(old)
160 } else {
161 Err(value)
162 };
163 drop(state); if changed {
167 for watcher in self.shared.wakers.lock().expect("poisoned").drain(..) {
168 watcher.wake();
169 }
170 }
171 ret
172 }
173
174 pub fn modify<F, R>(&self, f: F) -> R
203 where
204 F: FnOnce(&mut T) -> R,
205 {
206 let mut state = self.shared.state.write().expect("poisoned");
207 let old = state.value.clone();
208 let ret = f(&mut state.value);
209 let changed = old != state.value;
210 if changed {
211 state.epoch += 1;
212 }
213 drop(state);
214
215 if changed {
216 for watcher in self.shared.wakers.lock().expect("poisoned").drain(..) {
217 watcher.wake();
218 }
219 }
220 ret
221 }
222
223 pub fn watch(&self) -> Direct<T> {
225 Direct {
226 state: self.shared.state().clone(),
227 shared: Some(Arc::downgrade(&self.shared)),
228 }
229 }
230
231 pub fn get(&self) -> T {
233 self.shared.get()
234 }
235
236 pub fn has_watchers(&self) -> bool {
239 Arc::weak_count(&self.shared) != 0
242 }
243}
244
245impl<T> Drop for Shared<T> {
246 fn drop(&mut self) {
247 let Ok(mut watchers) = self.wakers.lock() else {
248 return; };
250 for watcher in watchers.drain(..) {
255 watcher.wake();
256 }
257 }
258}
259
260pub trait Watcher: Clone {
280 type Value: Clone + Eq;
289
290 fn get(&mut self) -> Self::Value {
305 self.update();
306 self.peek().clone()
307 }
308
309 fn update(&mut self) -> bool;
315
316 fn peek(&self) -> &Self::Value;
325
326 fn is_connected(&self) -> bool;
330
331 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>>;
334
335 fn updated(&mut self) -> NextFut<'_, Self> {
342 NextFut { watcher: self }
343 }
344
345 fn initialized<T, W>(&mut self) -> InitializedFut<'_, T, W, Self>
356 where
357 W: Nullable<T> + Clone,
358 Self: Watcher<Value = W>,
359 {
360 InitializedFut {
361 initial: self.get().into_option(),
362 watcher: self,
363 }
364 }
365
366 fn stream(mut self) -> Stream<Self>
380 where
381 Self: Unpin,
382 {
383 Stream {
384 initial: Some(self.get()),
385 watcher: self,
386 }
387 }
388
389 fn stream_updates_only(self) -> Stream<Self>
404 where
405 Self: Unpin,
406 {
407 Stream {
408 initial: None,
409 watcher: self,
410 }
411 }
412
413 fn map<T: Clone + Eq>(
418 mut self,
419 map: impl Fn(Self::Value) -> T + Send + Sync + 'static,
420 ) -> Map<Self, T> {
421 Map {
422 current: (map)(self.get()),
423 map: Arc::new(map),
424 watcher: self,
425 }
426 }
427
428 fn or<W: Watcher>(self, other: W) -> Tuple<Self, W> {
431 Tuple::new(self, other)
432 }
433}
434
435#[derive(Debug, Clone)]
439pub struct Direct<T> {
440 state: State<T>,
441 shared: Option<Weak<Shared<T>>>,
446}
447
448impl<T: Clone + Eq> Watcher for Direct<T> {
449 type Value = T;
450
451 fn update(&mut self) -> bool {
452 let Some(shared) = self.shared.as_ref().and_then(|weak| weak.upgrade()) else {
453 self.shared = None; return false;
455 };
456 let state = shared.state();
457 if state.epoch > self.state.epoch {
458 self.state = state.clone();
459 true
460 } else {
461 false
462 }
463 }
464
465 fn peek(&self) -> &Self::Value {
466 &self.state.value
467 }
468
469 fn is_connected(&self) -> bool {
470 self.shared
471 .as_ref()
472 .and_then(|weak| weak.upgrade())
473 .is_some()
474 }
475
476 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
477 let Some(shared) = self.shared.as_ref().and_then(|weak| weak.upgrade()) else {
478 self.shared = None; return Poll::Ready(Err(Disconnected));
480 };
481 self.state = ready!(shared.poll_updated(cx, self.state.epoch));
482 Poll::Ready(Ok(()))
483 }
484}
485
486#[derive(Debug, Clone)]
487pub struct Tuple<S: Watcher, T: Watcher> {
488 inner: (S, T),
489 current: (S::Value, T::Value),
490}
491
492impl<S: Watcher, T: Watcher> Tuple<S, T> {
493 pub fn new(mut s: S, mut t: T) -> Self {
494 let current = (s.get(), t.get());
495 Self {
496 inner: (s, t),
497 current,
498 }
499 }
500}
501
502impl<S: Watcher, T: Watcher> Watcher for Tuple<S, T> {
503 type Value = (S::Value, T::Value);
504
505 fn update(&mut self) -> bool {
506 let s_updated = self.inner.0.update();
508 let t_updated = self.inner.1.update();
509 let updated = s_updated || t_updated;
510 if updated {
511 self.current = (self.inner.0.peek().clone(), self.inner.1.peek().clone());
512 }
513 updated
514 }
515
516 fn peek(&self) -> &Self::Value {
517 &self.current
518 }
519
520 fn is_connected(&self) -> bool {
521 self.inner.0.is_connected() && self.inner.1.is_connected()
522 }
523
524 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
525 let poll_0 = self.inner.0.poll_updated(cx)?;
526 let poll_1 = self.inner.1.poll_updated(cx)?;
527 if poll_0.is_pending() && poll_1.is_pending() {
528 return Poll::Pending;
529 }
530 if poll_0.is_ready() {
531 self.current.0 = self.inner.0.peek().clone();
532 }
533 if poll_1.is_ready() {
534 self.current.1 = self.inner.1.peek().clone();
535 }
536 Poll::Ready(Ok(()))
537 }
538}
539
540#[derive(Debug, Clone)]
541pub struct Triple<S: Watcher, T: Watcher, U: Watcher> {
542 inner: (S, T, U),
543 current: (S::Value, T::Value, U::Value),
544}
545
546impl<S: Watcher, T: Watcher, U: Watcher> Triple<S, T, U> {
547 pub fn new(mut s: S, mut t: T, mut u: U) -> Self {
548 let current = (s.get(), t.get(), u.get());
549 Self {
550 inner: (s, t, u),
551 current,
552 }
553 }
554}
555
556impl<S: Watcher, T: Watcher, U: Watcher> Watcher for Triple<S, T, U> {
557 type Value = (S::Value, T::Value, U::Value);
558
559 fn update(&mut self) -> bool {
560 let s_updated = self.inner.0.update();
562 let t_updated = self.inner.1.update();
563 let u_updated = self.inner.2.update();
564 let updated = s_updated || t_updated || u_updated;
565 if updated {
566 self.current = (
567 self.inner.0.peek().clone(),
568 self.inner.1.peek().clone(),
569 self.inner.2.peek().clone(),
570 );
571 }
572 updated
573 }
574
575 fn peek(&self) -> &Self::Value {
576 &self.current
577 }
578
579 fn is_connected(&self) -> bool {
580 self.inner.0.is_connected() && self.inner.1.is_connected() && self.inner.2.is_connected()
581 }
582
583 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
584 let poll_0 = self.inner.0.poll_updated(cx)?;
585 let poll_1 = self.inner.1.poll_updated(cx)?;
586 let poll_2 = self.inner.2.poll_updated(cx)?;
587
588 if poll_0.is_pending() && poll_1.is_pending() && poll_2.is_pending() {
589 return Poll::Pending;
590 }
591 if poll_0.is_ready() {
592 self.current.0 = self.inner.0.peek().clone();
593 }
594 if poll_1.is_ready() {
595 self.current.1 = self.inner.1.peek().clone();
596 }
597 if poll_2.is_ready() {
598 self.current.2 = self.inner.2.peek().clone();
599 }
600 Poll::Ready(Ok(()))
601 }
602}
603
604#[derive(Debug, Clone)]
606pub struct Join<T: Clone + Eq, W: Watcher<Value = T>> {
607 watchers: Vec<W>,
609 current: Vec<T>,
610}
611
612impl<T: Clone + Eq, W: Watcher<Value = T>> Join<T, W> {
613 pub fn new(watchers: impl Iterator<Item = W>) -> Self {
615 let mut watchers: Vec<W> = watchers.into_iter().collect();
616
617 let mut current = Vec::with_capacity(watchers.len());
618 for watcher in &mut watchers {
619 current.push(watcher.get());
620 }
621 Self { watchers, current }
622 }
623}
624
625impl<T: Clone + Eq, W: Watcher<Value = T>> Watcher for Join<T, W> {
626 type Value = Vec<T>;
627
628 fn update(&mut self) -> bool {
629 let mut any_updated = false;
630 for (value, watcher) in self.current.iter_mut().zip(self.watchers.iter_mut()) {
631 if watcher.update() {
632 any_updated = true;
633 *value = watcher.peek().clone();
634 }
635 }
636 any_updated
637 }
638
639 fn peek(&self) -> &Self::Value {
640 &self.current
641 }
642
643 fn is_connected(&self) -> bool {
644 self.watchers.iter().all(|w| w.is_connected())
645 }
646
647 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
648 let mut any_updated = false;
649 for (value, watcher) in self.current.iter_mut().zip(self.watchers.iter_mut()) {
650 if watcher.poll_updated(cx)?.is_ready() {
651 any_updated = true;
652 *value = watcher.peek().clone();
653 }
654 }
655
656 if any_updated {
657 Poll::Ready(Ok(()))
658 } else {
659 Poll::Pending
660 }
661 }
662}
663
664#[derive(derive_more::Debug, Clone)]
668pub struct Map<W: Watcher, T: Clone + Eq> {
669 #[debug("Arc<dyn Fn(W::Value) -> T>")]
670 map: Arc<dyn Fn(W::Value) -> T + Send + Sync + 'static>,
671 watcher: W,
672 current: T,
673}
674
675impl<W: Watcher, T: Clone + Eq> Watcher for Map<W, T> {
676 type Value = T;
677
678 fn update(&mut self) -> bool {
679 if self.watcher.update() {
680 let new = (self.map)(self.watcher.peek().clone());
681 if new != self.current {
682 self.current = new;
683 true
684 } else {
685 false
686 }
687 } else {
688 false
689 }
690 }
691
692 fn peek(&self) -> &Self::Value {
693 &self.current
694 }
695
696 fn is_connected(&self) -> bool {
697 self.watcher.is_connected()
698 }
699
700 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
701 loop {
702 ready!(self.watcher.poll_updated(cx)?);
703 let new = (self.map)(self.watcher.peek().clone());
704 if new != self.current {
705 self.current = new;
706 return Poll::Ready(Ok(()));
707 }
708 }
709 }
710}
711
712#[derive(Debug)]
720pub struct NextFut<'a, W: Watcher> {
721 watcher: &'a mut W,
722}
723
724impl<W: Watcher> Future for NextFut<'_, W> {
725 type Output = Result<W::Value, Disconnected>;
726
727 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
728 ready!(self.watcher.poll_updated(cx))?;
729 Poll::Ready(Ok(self.watcher.peek().clone()))
730 }
731}
732
733#[derive(Debug)]
742pub struct InitializedFut<'a, T, V: Nullable<T> + Clone, W: Watcher<Value = V>> {
743 initial: Option<T>,
744 watcher: &'a mut W,
745}
746
747impl<T: Clone + Eq + Unpin, V: Nullable<T> + Clone, W: Watcher<Value = V> + Unpin> Future
748 for InitializedFut<'_, T, V, W>
749{
750 type Output = T;
751
752 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
753 let mut this = self.as_mut();
754 if let Some(value) = this.initial.take() {
755 return Poll::Ready(value);
756 }
757 loop {
758 if ready!(this.watcher.poll_updated(cx)).is_err() {
759 return Poll::Pending;
761 };
762 let value = this.watcher.peek();
763 if let Some(value) = value.clone().into_option() {
764 return Poll::Ready(value);
765 }
766 }
767 }
768}
769
770#[derive(Debug, Clone)]
778pub struct Stream<W: Watcher + Unpin> {
779 initial: Option<W::Value>,
780 watcher: W,
781}
782
783impl<W: Watcher + Unpin> n0_future::Stream for Stream<W>
784where
785 W::Value: Unpin,
786{
787 type Item = W::Value;
788
789 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
790 if let Some(value) = self.as_mut().initial.take() {
791 return Poll::Ready(Some(value));
792 }
793 match self.as_mut().watcher.poll_updated(cx) {
794 Poll::Ready(Ok(())) => Poll::Ready(Some(self.as_ref().watcher.peek().clone())),
795 Poll::Ready(Err(Disconnected)) => Poll::Ready(None),
796 Poll::Pending => Poll::Pending,
797 }
798 }
799}
800
801#[derive(StackError)]
804#[error("Watcher lost connection to underlying Watchable, it was dropped")]
805pub struct Disconnected;
806
807const INITIAL_EPOCH: u64 = 1;
810
811#[derive(Debug, Default)]
813struct Shared<T> {
814 state: RwLock<State<T>>,
816 wakers: Mutex<VecDeque<Waker>>,
817}
818
819#[derive(Debug, Clone)]
820struct State<T> {
821 value: T,
822 epoch: u64,
823}
824
825impl<T: Default> Default for State<T> {
826 fn default() -> Self {
827 Self {
828 value: Default::default(),
829 epoch: INITIAL_EPOCH,
830 }
831 }
832}
833
834impl<T: Clone> Shared<T> {
835 fn get(&self) -> T {
836 self.state.read().expect("poisoned").value.clone()
837 }
838
839 fn state(&self) -> RwLockReadGuard<'_, State<T>> {
840 self.state.read().expect("poisoned")
841 }
842
843 fn poll_updated(&self, cx: &mut task::Context<'_>, last_epoch: u64) -> Poll<State<T>> {
844 {
845 let state = self.state();
846
847 if last_epoch < state.epoch {
850 return Poll::Ready(state.clone());
851 }
852 }
853
854 self.add_waker(cx);
855
856 #[cfg(watcher_loom)]
857 loom::thread::yield_now();
858
859 {
861 let state = self.state();
862
863 if last_epoch < state.epoch {
864 return Poll::Ready(state.clone());
865 }
866 }
867
868 Poll::Pending
869 }
870
871 fn add_waker(&self, cx: &mut task::Context<'_>) {
872 let mut wakers = self.wakers.lock().expect("poisoned");
873 for waker in wakers.iter() {
874 if waker.will_wake(cx.waker()) {
875 return;
876 }
877 }
878 wakers.push_back(cx.waker().clone());
879 }
880}
881
882#[cfg(test)]
883mod tests {
884
885 use n0_future::{future::poll_once, StreamExt};
886 use rand::{rng, Rng};
887 use tokio::{
888 task::JoinSet,
889 time::{Duration, Instant},
890 };
891 use tokio_util::sync::CancellationToken;
892
893 use super::*;
894
895 #[tokio::test]
896 async fn test_watcher() {
897 let cancel = CancellationToken::new();
898 let watchable = Watchable::new(17);
899
900 assert_eq!(watchable.watch().stream().next().await.unwrap(), 17);
901
902 let start = Instant::now();
903 let mut tasks = JoinSet::new();
905 for i in 0..3 {
906 let mut watch = watchable.watch().stream();
907 let cancel = cancel.clone();
908 tasks.spawn(async move {
909 println!("[{i}] spawn");
910 let mut expected_value = 17;
911 loop {
912 tokio::select! {
913 biased;
914 Some(value) = &mut watch.next() => {
915 println!("{:?} [{i}] update: {value}", start.elapsed());
916 assert_eq!(value, expected_value);
917 if expected_value == 17 {
918 expected_value = 0;
919 } else {
920 expected_value += 1;
921 }
922 },
923 _ = cancel.cancelled() => {
924 println!("{:?} [{i}] cancel", start.elapsed());
925 assert_eq!(expected_value, 10);
926 break;
927 }
928 }
929 }
930 });
931 }
932 for i in 0..3 {
933 let mut watch = watchable.watch().stream_updates_only();
934 let cancel = cancel.clone();
935 tasks.spawn(async move {
936 println!("[{i}] spawn");
937 let mut expected_value = 0;
938 loop {
939 tokio::select! {
940 biased;
941 Some(value) = watch.next() => {
942 println!("{:?} [{i}] stream update: {value}", start.elapsed());
943 assert_eq!(value, expected_value);
944 expected_value += 1;
945 },
946 _ = cancel.cancelled() => {
947 println!("{:?} [{i}] cancel", start.elapsed());
948 assert_eq!(expected_value, 10);
949 break;
950 }
951 else => {
952 panic!("stream died");
953 }
954 }
955 }
956 });
957 }
958
959 for next_value in 0..10 {
961 let sleep = Duration::from_nanos(rng().random_range(0..100_000_000));
962 println!("{:?} sleep {sleep:?}", start.elapsed());
963 tokio::time::sleep(sleep).await;
964
965 let changed = watchable.set(next_value);
966 println!("{:?} set {next_value} changed={changed:?}", start.elapsed());
967 }
968
969 println!("cancel");
970 cancel.cancel();
971 while let Some(res) = tasks.join_next().await {
972 res.expect("task failed");
973 }
974 }
975
976 #[test]
977 fn test_get() {
978 let watchable = Watchable::new(None);
979 assert!(watchable.get().is_none());
980
981 watchable.set(Some(1u8)).ok();
982 assert_eq!(watchable.get(), Some(1u8));
983 }
984
985 #[tokio::test]
986 async fn test_initialize() {
987 let watchable = Watchable::new(None);
988
989 let mut watcher = watchable.watch();
990 let mut initialized = watcher.initialized();
991
992 let poll = poll_once(&mut initialized).await;
993 assert!(poll.is_none());
994
995 watchable.set(Some(1u8)).ok();
996
997 let poll = poll_once(&mut initialized).await;
998 assert_eq!(poll.unwrap(), 1u8);
999 }
1000
1001 #[tokio::test]
1002 async fn test_initialize_already_init() {
1003 let watchable = Watchable::new(Some(1u8));
1004
1005 let mut watcher = watchable.watch();
1006 let mut initialized = watcher.initialized();
1007
1008 let poll = poll_once(&mut initialized).await;
1009 assert_eq!(poll.unwrap(), 1u8);
1010 }
1011
1012 #[test]
1013 fn test_initialized_always_resolves() {
1014 #[cfg(not(watcher_loom))]
1015 use std::thread;
1016
1017 #[cfg(watcher_loom)]
1018 use loom::thread;
1019
1020 let test_case = || {
1021 let watchable = Watchable::<Option<u8>>::new(None);
1022
1023 let mut watch = watchable.watch();
1024 let thread = thread::spawn(move || n0_future::future::block_on(watch.initialized()));
1025
1026 watchable.set(Some(42)).ok();
1027
1028 thread::yield_now();
1029
1030 let value: u8 = thread.join().unwrap();
1031
1032 assert_eq!(value, 42);
1033 };
1034
1035 #[cfg(watcher_loom)]
1036 loom::model(test_case);
1037 #[cfg(not(watcher_loom))]
1038 test_case();
1039 }
1040
1041 #[tokio::test(flavor = "multi_thread")]
1042 async fn test_update_cancel_safety() {
1043 let watchable = Watchable::new(0);
1044 let mut watch = watchable.watch();
1045 const MAX: usize = 100_000;
1046
1047 let handle = tokio::spawn(async move {
1048 let mut last_observed = 0;
1049
1050 while last_observed != MAX {
1051 tokio::select! {
1052 val = watch.updated() => {
1053 let Ok(val) = val else {
1054 return;
1055 };
1056
1057 assert_ne!(val, last_observed, "never observe the same value twice, even with cancellation");
1058 last_observed = val;
1059 }
1060 _ = tokio::time::sleep(Duration::from_micros(rng().random_range(0..10_000))) => {
1061 continue;
1063 }
1064 }
1065 }
1066 });
1067
1068 for i in 1..=MAX {
1069 watchable.set(i).ok();
1070 if rng().random_bool(0.2) {
1071 tokio::task::yield_now().await;
1072 }
1073 }
1074
1075 tokio::time::timeout(Duration::from_secs(10), handle)
1076 .await
1077 .unwrap()
1078 .unwrap()
1079 }
1080
1081 #[tokio::test]
1082 async fn test_join_simple() {
1083 let a = Watchable::new(1u8);
1084 let b = Watchable::new(1u8);
1085
1086 let mut ab = Join::new([a.watch(), b.watch()].into_iter());
1087
1088 let stream = ab.clone().stream();
1089 let handle = tokio::task::spawn(async move { stream.take(5).collect::<Vec<_>>().await });
1090
1091 assert_eq!(ab.get(), vec![1, 1]);
1093 a.set(2u8).unwrap();
1095 tokio::task::yield_now().await;
1096 assert_eq!(ab.get(), vec![2, 1]);
1097 b.set(3u8).unwrap();
1099 tokio::task::yield_now().await;
1100 assert_eq!(ab.get(), vec![2, 3]);
1101
1102 a.set(3u8).unwrap();
1103 tokio::task::yield_now().await;
1104 b.set(4u8).unwrap();
1105 tokio::task::yield_now().await;
1106
1107 let values = tokio::time::timeout(Duration::from_secs(5), handle)
1108 .await
1109 .unwrap()
1110 .unwrap();
1111 assert_eq!(
1112 values,
1113 vec![vec![1, 1], vec![2, 1], vec![2, 3], vec![3, 3], vec![3, 4]]
1114 );
1115 }
1116
1117 #[tokio::test]
1118 async fn test_updated_then_disconnect_then_get() {
1119 let watchable = Watchable::new(10);
1120 let mut watcher = watchable.watch();
1121 assert_eq!(watchable.get(), 10);
1122 watchable.set(42).ok();
1123 assert_eq!(watcher.updated().await.unwrap(), 42);
1124 drop(watchable);
1125 assert_eq!(watcher.get(), 42);
1126 }
1127
1128 #[tokio::test(start_paused = true)]
1129 async fn test_update_wakeup_on_watchable_drop() {
1130 let watchable = Watchable::new(10);
1131 let mut watcher = watchable.watch();
1132
1133 let start = Instant::now();
1134 let (_, result) = tokio::time::timeout(Duration::from_secs(2), async move {
1135 tokio::join!(
1136 async move {
1137 tokio::time::sleep(Duration::from_secs(1)).await;
1138 drop(watchable);
1139 },
1140 async move { watcher.updated().await }
1141 )
1142 })
1143 .await
1144 .expect("watcher never updated");
1145 assert_eq!(start.elapsed(), Duration::from_secs(1));
1148 assert!(result.is_err());
1149 }
1150
1151 #[tokio::test(start_paused = true)]
1152 async fn test_update_wakeup_always_a_change() {
1153 let watchable = Watchable::new(10);
1154 let mut watcher = watchable.watch();
1155
1156 let task = tokio::spawn(async move {
1157 let mut last_value = watcher.get();
1158 let mut values = Vec::new();
1159 while let Ok(value) = watcher.updated().await {
1160 values.push(value);
1161 if last_value == value {
1162 return Err("value duplicated");
1163 }
1164 last_value = value;
1165 }
1166 Ok(values)
1167 });
1168
1169 tokio::time::sleep(Duration::from_millis(100)).await;
1171
1172 watchable.set(11).ok();
1173 tokio::time::sleep(Duration::from_millis(100)).await;
1174 let clone = watchable.clone();
1175 drop(clone); tokio::time::sleep(Duration::from_millis(100)).await;
1177 for i in 1..=10 {
1178 watchable.set(i + 11).ok();
1179 tokio::time::sleep(Duration::from_millis(100)).await;
1180 }
1181 drop(watchable);
1182
1183 let values = task
1184 .await
1185 .expect("task panicked")
1186 .expect("value duplicated");
1187 assert_eq!(values, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]);
1188 }
1189
1190 #[test]
1191 fn test_has_watchers() {
1192 let a = Watchable::new(1u8);
1193 assert!(!a.has_watchers());
1194 let b = a.clone();
1195 assert!(!a.has_watchers());
1196 assert!(!b.has_watchers());
1197
1198 let watcher = a.watch();
1199 assert!(a.has_watchers());
1200 assert!(b.has_watchers());
1201
1202 drop(watcher);
1203
1204 assert!(!a.has_watchers());
1205 assert!(!b.has_watchers());
1206 }
1207
1208 #[tokio::test]
1209 async fn test_three_watchers_basic() {
1210 let watchable = Watchable::new(1u8);
1211
1212 let mut w1 = watchable.watch();
1213 let mut w2 = watchable.watch();
1214 let mut w3 = watchable.watch();
1215
1216 assert_eq!(w1.get(), 1);
1219 assert_eq!(w2.get(), 1);
1220 assert_eq!(w3.get(), 1);
1221
1222 watchable.set(42).unwrap();
1224
1225 assert_eq!(w1.updated().await.unwrap(), 42);
1227 assert_eq!(w2.updated().await.unwrap(), 42);
1228 assert_eq!(w3.updated().await.unwrap(), 42);
1229 }
1230
1231 #[tokio::test]
1232 async fn test_three_watchers_skip_intermediate() {
1233 let watchable = Watchable::new(0u8);
1234 let mut watcher = watchable.watch();
1235
1236 watchable.set(1).ok();
1237 watchable.set(2).ok();
1238 watchable.set(3).ok();
1239 watchable.set(4).ok();
1240
1241 let value = watcher.updated().await.unwrap();
1242
1243 assert_eq!(value, 4);
1244 }
1245
1246 #[tokio::test]
1247 async fn test_three_watchers_with_streams() {
1248 let watchable = Watchable::new(10u8);
1249
1250 let mut stream1 = watchable.watch().stream();
1251 let mut stream2 = watchable.watch().stream();
1252 let mut stream3 = watchable.watch().stream_updates_only();
1253
1254 assert_eq!(stream1.next().await.unwrap(), 10);
1255 assert_eq!(stream2.next().await.unwrap(), 10);
1256
1257 watchable.set(20).ok();
1259
1260 assert_eq!(stream1.next().await.unwrap(), 20);
1262 assert_eq!(stream2.next().await.unwrap(), 20);
1263 assert_eq!(stream3.next().await.unwrap(), 20);
1264 }
1265
1266 #[tokio::test]
1267 async fn test_three_watchers_independent() {
1268 let watchable = Watchable::new(0u8);
1269
1270 let mut fast_watcher = watchable.watch();
1271 let mut slow_watcher = watchable.watch();
1272 let mut lazy_watcher = watchable.watch();
1273
1274 watchable.set(1).ok();
1275 assert_eq!(fast_watcher.updated().await.unwrap(), 1);
1276
1277 watchable.set(2).ok();
1279 watchable.set(3).ok();
1280
1281 assert_eq!(slow_watcher.updated().await.unwrap(), 3);
1282 assert_eq!(lazy_watcher.get(), 3);
1283 }
1284
1285 #[tokio::test]
1286 async fn test_combine_three_watchers() {
1287 let a = Watchable::new(1u8);
1288 let b = Watchable::new(2u8);
1289 let c = Watchable::new(3u8);
1290
1291 let mut combined = Triple::new(a.watch(), b.watch(), c.watch());
1292
1293 assert_eq!(combined.get(), (1, 2, 3));
1294
1295 b.set(20).ok();
1297
1298 assert_eq!(combined.updated().await.unwrap(), (1, 20, 3));
1299
1300 c.set(30).ok();
1301 assert_eq!(combined.updated().await.unwrap(), (1, 20, 30));
1302 }
1303
1304 #[tokio::test]
1305 async fn test_three_watchers_disconnection() {
1306 let watchable = Watchable::new(5u8);
1307
1308 let mut w1 = watchable.watch();
1310 let mut w2 = watchable.watch();
1311 let mut w3 = watchable.watch();
1312
1313 drop(watchable);
1315
1316 assert!(!w1.is_connected());
1318 assert!(!w2.is_connected());
1319 assert!(!w3.is_connected());
1320
1321 assert_eq!(w1.get(), 5);
1323 assert_eq!(w2.get(), 5);
1324
1325 assert!(w3.updated().await.is_err());
1327 }
1328
1329 #[tokio::test]
1330 async fn test_three_watchers_truly_concurrent() {
1331 use tokio::time::sleep;
1332 let watchable = Watchable::new(0u8);
1333
1334 let mut reader_handles = vec![];
1336 for i in 0..3 {
1337 let mut watcher = watchable.watch();
1338 let handle = tokio::spawn(async move {
1339 let mut values = vec![];
1340 for _ in 0..5 {
1342 if let Ok(value) = watcher.updated().await {
1343 values.push(value);
1344 } else {
1345 break;
1346 }
1347 }
1348 (i, values)
1349 });
1350 reader_handles.push(handle);
1351 }
1352
1353 let mut writer_handles = vec![];
1355 for i in 0..3 {
1356 let watchable_clone = watchable.clone();
1357 let handle = tokio::spawn(async move {
1358 for j in 0..5 {
1359 let value = (i * 10) + j;
1360 watchable_clone.set(value).ok();
1361 sleep(Duration::from_millis(5)).await;
1362 }
1363 });
1364 writer_handles.push(handle);
1365 }
1366
1367 for handle in writer_handles {
1369 handle.await.unwrap();
1370 }
1371
1372 for handle in reader_handles {
1374 let (task_id, values) = handle.await.unwrap();
1375 println!("Reader {}: saw values {:?}", task_id, values);
1376 assert!(!values.is_empty());
1377 }
1378 }
1379
1380 #[tokio::test]
1381 async fn test_peek() {
1382 let a = Watchable::new(vec![1, 2, 3]);
1383 let mut wa = a.watch();
1384
1385 assert_eq!(wa.get(), vec![1, 2, 3]);
1386 assert_eq!(wa.peek(), &vec![1, 2, 3]);
1387
1388 let mut wa_map = wa.map(|a| a.into_iter().map(|a| a * 2).collect::<Vec<_>>());
1389
1390 assert_eq!(wa_map.get(), vec![2, 4, 6]);
1391 assert_eq!(wa_map.peek(), &vec![2, 4, 6]);
1392
1393 let mut wb = a.watch();
1394
1395 assert_eq!(wb.get(), vec![1, 2, 3]);
1396 assert_eq!(wb.peek(), &vec![1, 2, 3]);
1397
1398 let mut wb_map = wb.map(|a| a.into_iter().map(|a| a * 2).collect::<Vec<_>>());
1399
1400 assert_eq!(wb_map.get(), vec![2, 4, 6]);
1401 assert_eq!(wb_map.peek(), &vec![2, 4, 6]);
1402
1403 let mut w_join = Join::new([wa_map, wb_map].into_iter());
1404
1405 assert_eq!(w_join.get(), vec![vec![2, 4, 6], vec![2, 4, 6]]);
1406 assert_eq!(w_join.peek(), &vec![vec![2, 4, 6], vec![2, 4, 6]]);
1407 }
1408
1409 #[tokio::test]
1410 async fn test_update_updates_peek() {
1411 let value = Watchable::new(42);
1412 let mut watcher = value.watch();
1413
1414 assert_eq!(watcher.peek(), &42);
1415 assert!(!watcher.update());
1416
1417 value.set(50).ok();
1418
1419 assert_eq!(watcher.peek(), &42); assert!(watcher.update()); assert_eq!(watcher.peek(), &50);
1422 assert!(!watcher.update());
1423
1424 let mut watcher_map = watcher.clone().map(|v| v * 2);
1425
1426 assert_eq!(watcher_map.peek(), &100);
1427 assert!(!watcher_map.update());
1428
1429 value.set(10).ok();
1430
1431 assert_eq!(watcher_map.peek(), &100);
1432 assert!(watcher_map.update());
1433 assert_eq!(watcher_map.peek(), &20);
1434 assert!(!watcher_map.update());
1435
1436 let value2 = Watchable::new(0);
1437 let mut watcher_join = Join::new([watcher, value2.watch()].into_iter());
1438
1439 assert_eq!(watcher_join.peek(), &vec![10, 0]);
1440 assert!(!watcher_join.update());
1441
1442 value.set(0).ok();
1443 value2.set(1).ok();
1444
1445 assert_eq!(watcher_join.peek(), &vec![10, 0]);
1446 assert!(watcher_join.update());
1447 assert_eq!(watcher_join.peek(), &vec![0, 1]);
1448 assert!(!watcher_join.update());
1449 }
1450
1451 #[tokio::test]
1452 async fn test_get_updates_peek() {
1453 let value = Watchable::new(42);
1454 let mut watcher = value.watch();
1455
1456 assert_eq!(watcher.peek(), &42);
1457 assert!(!watcher.update());
1458
1459 value.set(50).ok();
1460
1461 assert_eq!(watcher.peek(), &42); assert_eq!(watcher.get(), 50); assert_eq!(watcher.peek(), &50);
1464 assert!(!watcher.update());
1465
1466 let mut watcher_map = watcher.clone().map(|v| v * 2);
1467
1468 assert_eq!(watcher_map.peek(), &100);
1469 assert!(!watcher_map.update());
1470
1471 value.set(10).ok();
1472
1473 assert_eq!(watcher_map.peek(), &100);
1474 assert_eq!(watcher_map.get(), 20);
1475 assert_eq!(watcher_map.peek(), &20);
1476 assert!(!watcher_map.update());
1477
1478 let value2 = Watchable::new(0);
1479 let mut watcher_join = Join::new([watcher, value2.watch()].into_iter());
1480
1481 assert_eq!(watcher_join.peek(), &vec![10, 0]);
1482 assert!(!watcher_join.update());
1483
1484 value.set(0).ok();
1485 value2.set(1).ok();
1486
1487 assert_eq!(watcher_join.peek(), &vec![10, 0]);
1488 assert_eq!(watcher_join.get(), vec![0, 1]);
1489 assert_eq!(watcher_join.peek(), &vec![0, 1]);
1490 assert!(!watcher_join.update());
1491 }
1492
1493 #[tokio::test]
1494 async fn test_modify_cas() {
1495 let watchable = Watchable::new(0u32);
1496 let mut watcher = watchable.watch();
1497
1498 let swapped = watchable.modify(|v| {
1500 if *v == 0 {
1501 *v = 1;
1502 true
1503 } else {
1504 false
1505 }
1506 });
1507 assert!(swapped);
1508 assert_eq!(watchable.get(), 1);
1509 assert_eq!(watcher.updated().await.unwrap(), 1);
1510
1511 let swapped = watchable.modify(|v| {
1513 if *v == 0 {
1514 *v = 2;
1515 true
1516 } else {
1517 false
1518 }
1519 });
1520 assert!(!swapped);
1521 assert_eq!(watchable.get(), 1);
1522 assert!(poll_once(&mut watcher.updated()).await.is_none());
1523
1524 watchable.modify(|v| *v = 1);
1526 assert_eq!(watchable.get(), 1);
1527 assert!(poll_once(&mut watcher.updated()).await.is_none());
1528
1529 let prev = watchable.modify(|v| {
1531 let prev = *v;
1532 *v += 10;
1533 prev
1534 });
1535 assert_eq!(prev, 1);
1536 assert_eq!(watcher.updated().await.unwrap(), 11);
1537 }
1538
1539 #[tokio::test]
1540 async fn test_ensure_wakers_bounded() {
1541 use tokio::time::{interval, Duration};
1542 let watchable = Watchable::new(0);
1543 let mut watcher = watchable.watch();
1544 let max_tick = 1000;
1545
1546 let handle = tokio::spawn(async move {
1547 let mut ticker = interval(Duration::from_nanos(1));
1548 let mut tick_no = 0;
1549 loop {
1550 tokio::select! {
1551 _ = watcher.updated() => {}
1552 _ = ticker.tick() => {
1553 tick_no += 1;
1555 if tick_no > max_tick{
1556 return
1557 }
1558 }
1559 }
1560 let num_wakers = watchable.shared.wakers.lock().unwrap().len();
1561 assert_eq!(num_wakers, 1);
1562 }
1563 });
1564
1565 tokio::time::timeout(Duration::from_secs(1), handle)
1566 .await
1567 .unwrap()
1568 .unwrap()
1569 }
1570}