1#[cfg(not(watcher_loom))]
78use std::sync;
79use std::{
80 collections::VecDeque,
81 future::Future,
82 pin::Pin,
83 sync::{Arc, RwLockReadGuard, Weak},
84 task::{self, ready, Poll, Waker},
85};
86
87#[cfg(watcher_loom)]
88use loom::sync;
89use n0_error::StackError;
90use sync::{Mutex, RwLock};
91
92#[derive(Debug, Default)]
97pub struct Watchable<T> {
98 shared: Arc<Shared<T>>,
99}
100
101impl<T> Clone for Watchable<T> {
102 fn clone(&self) -> Self {
103 Self {
104 shared: self.shared.clone(),
105 }
106 }
107}
108
109pub trait Nullable<T> {
111 fn into_option(self) -> Option<T>;
113}
114
115impl<T> Nullable<T> for Option<T> {
116 fn into_option(self) -> Option<T> {
117 self
118 }
119}
120
121impl<T> Nullable<T> for Vec<T> {
122 fn into_option(mut self) -> Option<T> {
123 self.pop()
124 }
125}
126
127impl<T: Clone + Eq> Watchable<T> {
128 pub fn new(value: T) -> Self {
130 Self {
131 shared: Arc::new(Shared {
132 state: RwLock::new(State {
133 value,
134 epoch: INITIAL_EPOCH,
135 }),
136 watchers: Default::default(),
137 }),
138 }
139 }
140
141 pub fn set(&self, value: T) -> Result<T, T> {
148 let mut state = self.shared.state.write().expect("poisoned");
152
153 let changed = state.value != value;
155
156 let ret = if changed {
157 let old = std::mem::replace(&mut state.value, value);
158 state.epoch += 1;
159 Ok(old)
160 } else {
161 Err(value)
162 };
163 drop(state); if changed {
167 for watcher in self.shared.watchers.lock().expect("poisoned").drain(..) {
168 watcher.wake();
169 }
170 }
171 ret
172 }
173
174 pub fn watch(&self) -> Direct<T> {
176 Direct {
177 state: self.shared.state().clone(),
178 shared: Some(Arc::downgrade(&self.shared)),
179 }
180 }
181
182 pub fn get(&self) -> T {
184 self.shared.get()
185 }
186
187 pub fn has_watchers(&self) -> bool {
190 Arc::weak_count(&self.shared) != 0
193 }
194}
195
196impl<T> Drop for Shared<T> {
197 fn drop(&mut self) {
198 let Ok(mut watchers) = self.watchers.lock() else {
199 return; };
201 for watcher in watchers.drain(..) {
206 watcher.wake();
207 }
208 }
209}
210
211pub trait Watcher: Clone {
231 type Value: Clone + Eq;
240
241 fn get(&mut self) -> Self::Value {
256 self.update();
257 self.peek().clone()
258 }
259
260 fn update(&mut self) -> bool;
266
267 fn peek(&self) -> &Self::Value;
276
277 fn is_connected(&self) -> bool;
281
282 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>>;
285
286 fn updated(&mut self) -> NextFut<'_, Self> {
293 NextFut { watcher: self }
294 }
295
296 fn initialized<T, W>(&mut self) -> InitializedFut<'_, T, W, Self>
307 where
308 W: Nullable<T> + Clone,
309 Self: Watcher<Value = W>,
310 {
311 InitializedFut {
312 initial: self.get().into_option(),
313 watcher: self,
314 }
315 }
316
317 fn stream(mut self) -> Stream<Self>
331 where
332 Self: Unpin,
333 {
334 Stream {
335 initial: Some(self.get()),
336 watcher: self,
337 }
338 }
339
340 fn stream_updates_only(self) -> Stream<Self>
355 where
356 Self: Unpin,
357 {
358 Stream {
359 initial: None,
360 watcher: self,
361 }
362 }
363
364 fn map<T: Clone + Eq>(
369 mut self,
370 map: impl Fn(Self::Value) -> T + Send + Sync + 'static,
371 ) -> Map<Self, T> {
372 Map {
373 current: (map)(self.get()),
374 map: Arc::new(map),
375 watcher: self,
376 }
377 }
378
379 fn or<W: Watcher>(self, other: W) -> Tuple<Self, W> {
382 Tuple::new(self, other)
383 }
384}
385
386#[derive(Debug, Clone)]
390pub struct Direct<T> {
391 state: State<T>,
392 shared: Option<Weak<Shared<T>>>,
397}
398
399impl<T: Clone + Eq> Watcher for Direct<T> {
400 type Value = T;
401
402 fn update(&mut self) -> bool {
403 let Some(shared) = self.shared.as_ref().and_then(|weak| weak.upgrade()) else {
404 self.shared = None; return false;
406 };
407 let state = shared.state();
408 if state.epoch > self.state.epoch {
409 self.state = state.clone();
410 true
411 } else {
412 false
413 }
414 }
415
416 fn peek(&self) -> &Self::Value {
417 &self.state.value
418 }
419
420 fn is_connected(&self) -> bool {
421 self.shared
422 .as_ref()
423 .and_then(|weak| weak.upgrade())
424 .is_some()
425 }
426
427 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
428 let Some(shared) = self.shared.as_ref().and_then(|weak| weak.upgrade()) else {
429 self.shared = None; return Poll::Ready(Err(Disconnected));
431 };
432 self.state = ready!(shared.poll_updated(cx, self.state.epoch));
433 Poll::Ready(Ok(()))
434 }
435}
436
437#[derive(Debug, Clone)]
438pub struct Tuple<S: Watcher, T: Watcher> {
439 inner: (S, T),
440 current: (S::Value, T::Value),
441}
442
443impl<S: Watcher, T: Watcher> Tuple<S, T> {
444 pub fn new(mut s: S, mut t: T) -> Self {
445 let current = (s.get(), t.get());
446 Self {
447 inner: (s, t),
448 current,
449 }
450 }
451}
452
453impl<S: Watcher, T: Watcher> Watcher for Tuple<S, T> {
454 type Value = (S::Value, T::Value);
455
456 fn update(&mut self) -> bool {
457 let s_updated = self.inner.0.update();
459 let t_updated = self.inner.1.update();
460 let updated = s_updated || t_updated;
461 if updated {
462 self.current = (self.inner.0.peek().clone(), self.inner.1.peek().clone());
463 }
464 updated
465 }
466
467 fn peek(&self) -> &Self::Value {
468 &self.current
469 }
470
471 fn is_connected(&self) -> bool {
472 self.inner.0.is_connected() && self.inner.1.is_connected()
473 }
474
475 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
476 let poll_0 = self.inner.0.poll_updated(cx)?;
477 let poll_1 = self.inner.1.poll_updated(cx)?;
478 if poll_0.is_pending() && poll_1.is_pending() {
479 return Poll::Pending;
480 }
481 if poll_0.is_ready() {
482 self.current.0 = self.inner.0.peek().clone();
483 }
484 if poll_1.is_ready() {
485 self.current.1 = self.inner.1.peek().clone();
486 }
487 Poll::Ready(Ok(()))
488 }
489}
490
491#[derive(Debug, Clone)]
492pub struct Triple<S: Watcher, T: Watcher, U: Watcher> {
493 inner: (S, T, U),
494 current: (S::Value, T::Value, U::Value),
495}
496
497impl<S: Watcher, T: Watcher, U: Watcher> Triple<S, T, U> {
498 pub fn new(mut s: S, mut t: T, mut u: U) -> Self {
499 let current = (s.get(), t.get(), u.get());
500 Self {
501 inner: (s, t, u),
502 current,
503 }
504 }
505}
506
507impl<S: Watcher, T: Watcher, U: Watcher> Watcher for Triple<S, T, U> {
508 type Value = (S::Value, T::Value, U::Value);
509
510 fn update(&mut self) -> bool {
511 let s_updated = self.inner.0.update();
513 let t_updated = self.inner.1.update();
514 let u_updated = self.inner.2.update();
515 let updated = s_updated || t_updated || u_updated;
516 if updated {
517 self.current = (
518 self.inner.0.peek().clone(),
519 self.inner.1.peek().clone(),
520 self.inner.2.peek().clone(),
521 );
522 }
523 updated
524 }
525
526 fn peek(&self) -> &Self::Value {
527 &self.current
528 }
529
530 fn is_connected(&self) -> bool {
531 self.inner.0.is_connected() && self.inner.1.is_connected() && self.inner.2.is_connected()
532 }
533
534 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
535 let poll_0 = self.inner.0.poll_updated(cx)?;
536 let poll_1 = self.inner.1.poll_updated(cx)?;
537 let poll_2 = self.inner.2.poll_updated(cx)?;
538
539 if poll_0.is_pending() && poll_1.is_pending() && poll_2.is_pending() {
540 return Poll::Pending;
541 }
542 if poll_0.is_ready() {
543 self.current.0 = self.inner.0.peek().clone();
544 }
545 if poll_1.is_ready() {
546 self.current.1 = self.inner.1.peek().clone();
547 }
548 if poll_2.is_ready() {
549 self.current.2 = self.inner.2.peek().clone();
550 }
551 Poll::Ready(Ok(()))
552 }
553}
554
555#[derive(Debug, Clone)]
557pub struct Join<T: Clone + Eq, W: Watcher<Value = T>> {
558 watchers: Vec<W>,
560 current: Vec<T>,
561}
562
563impl<T: Clone + Eq, W: Watcher<Value = T>> Join<T, W> {
564 pub fn new(watchers: impl Iterator<Item = W>) -> Self {
566 let mut watchers: Vec<W> = watchers.into_iter().collect();
567
568 let mut current = Vec::with_capacity(watchers.len());
569 for watcher in &mut watchers {
570 current.push(watcher.get());
571 }
572 Self { watchers, current }
573 }
574}
575
576impl<T: Clone + Eq, W: Watcher<Value = T>> Watcher for Join<T, W> {
577 type Value = Vec<T>;
578
579 fn update(&mut self) -> bool {
580 let mut any_updated = false;
581 for (value, watcher) in self.current.iter_mut().zip(self.watchers.iter_mut()) {
582 if watcher.update() {
583 any_updated = true;
584 *value = watcher.peek().clone();
585 }
586 }
587 any_updated
588 }
589
590 fn peek(&self) -> &Self::Value {
591 &self.current
592 }
593
594 fn is_connected(&self) -> bool {
595 self.watchers.iter().all(|w| w.is_connected())
596 }
597
598 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
599 let mut any_updated = false;
600 for (value, watcher) in self.current.iter_mut().zip(self.watchers.iter_mut()) {
601 if watcher.poll_updated(cx)?.is_ready() {
602 any_updated = true;
603 *value = watcher.peek().clone();
604 }
605 }
606
607 if any_updated {
608 Poll::Ready(Ok(()))
609 } else {
610 Poll::Pending
611 }
612 }
613}
614
615#[derive(derive_more::Debug, Clone)]
619pub struct Map<W: Watcher, T: Clone + Eq> {
620 #[debug("Arc<dyn Fn(W::Value) -> T>")]
621 map: Arc<dyn Fn(W::Value) -> T + Send + Sync + 'static>,
622 watcher: W,
623 current: T,
624}
625
626impl<W: Watcher, T: Clone + Eq> Watcher for Map<W, T> {
627 type Value = T;
628
629 fn update(&mut self) -> bool {
630 if self.watcher.update() {
631 let new = (self.map)(self.watcher.peek().clone());
632 if new != self.current {
633 self.current = new;
634 true
635 } else {
636 false
637 }
638 } else {
639 false
640 }
641 }
642
643 fn peek(&self) -> &Self::Value {
644 &self.current
645 }
646
647 fn is_connected(&self) -> bool {
648 self.watcher.is_connected()
649 }
650
651 fn poll_updated(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Disconnected>> {
652 loop {
653 ready!(self.watcher.poll_updated(cx)?);
654 let new = (self.map)(self.watcher.peek().clone());
655 if new != self.current {
656 self.current = new;
657 return Poll::Ready(Ok(()));
658 }
659 }
660 }
661}
662
663#[derive(Debug)]
671pub struct NextFut<'a, W: Watcher> {
672 watcher: &'a mut W,
673}
674
675impl<W: Watcher> Future for NextFut<'_, W> {
676 type Output = Result<W::Value, Disconnected>;
677
678 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
679 ready!(self.watcher.poll_updated(cx))?;
680 Poll::Ready(Ok(self.watcher.peek().clone()))
681 }
682}
683
684#[derive(Debug)]
693pub struct InitializedFut<'a, T, V: Nullable<T> + Clone, W: Watcher<Value = V>> {
694 initial: Option<T>,
695 watcher: &'a mut W,
696}
697
698impl<T: Clone + Eq + Unpin, V: Nullable<T> + Clone, W: Watcher<Value = V> + Unpin> Future
699 for InitializedFut<'_, T, V, W>
700{
701 type Output = T;
702
703 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
704 let mut this = self.as_mut();
705 if let Some(value) = this.initial.take() {
706 return Poll::Ready(value);
707 }
708 loop {
709 if ready!(this.watcher.poll_updated(cx)).is_err() {
710 return Poll::Pending;
712 };
713 let value = this.watcher.peek();
714 if let Some(value) = value.clone().into_option() {
715 return Poll::Ready(value);
716 }
717 }
718 }
719}
720
721#[derive(Debug, Clone)]
729pub struct Stream<W: Watcher + Unpin> {
730 initial: Option<W::Value>,
731 watcher: W,
732}
733
734impl<W: Watcher + Unpin> n0_future::Stream for Stream<W>
735where
736 W::Value: Unpin,
737{
738 type Item = W::Value;
739
740 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
741 if let Some(value) = self.as_mut().initial.take() {
742 return Poll::Ready(Some(value));
743 }
744 match self.as_mut().watcher.poll_updated(cx) {
745 Poll::Ready(Ok(())) => Poll::Ready(Some(self.as_ref().watcher.peek().clone())),
746 Poll::Ready(Err(Disconnected)) => Poll::Ready(None),
747 Poll::Pending => Poll::Pending,
748 }
749 }
750}
751
752#[derive(StackError)]
755#[error("Watcher lost connection to underlying Watchable, it was dropped")]
756pub struct Disconnected;
757
758const INITIAL_EPOCH: u64 = 1;
761
762#[derive(Debug, Default)]
764struct Shared<T> {
765 state: RwLock<State<T>>,
767 watchers: Mutex<VecDeque<Waker>>,
768}
769
770#[derive(Debug, Clone)]
771struct State<T> {
772 value: T,
773 epoch: u64,
774}
775
776impl<T: Default> Default for State<T> {
777 fn default() -> Self {
778 Self {
779 value: Default::default(),
780 epoch: INITIAL_EPOCH,
781 }
782 }
783}
784
785impl<T: Clone> Shared<T> {
786 fn get(&self) -> T {
787 self.state.read().expect("poisoned").value.clone()
788 }
789
790 fn state(&self) -> RwLockReadGuard<'_, State<T>> {
791 self.state.read().expect("poisoned")
792 }
793
794 fn poll_updated(&self, cx: &mut task::Context<'_>, last_epoch: u64) -> Poll<State<T>> {
795 {
796 let state = self.state();
797
798 if last_epoch < state.epoch {
801 return Poll::Ready(state.clone());
802 }
803 }
804
805 self.watchers
806 .lock()
807 .expect("poisoned")
808 .push_back(cx.waker().to_owned());
809
810 #[cfg(watcher_loom)]
811 loom::thread::yield_now();
812
813 {
815 let state = self.state();
816
817 if last_epoch < state.epoch {
818 return Poll::Ready(state.clone());
819 }
820 }
821
822 Poll::Pending
823 }
824}
825
826#[cfg(test)]
827mod tests {
828
829 use n0_future::{future::poll_once, StreamExt};
830 use rand::{rng, Rng};
831 use tokio::{
832 task::JoinSet,
833 time::{Duration, Instant},
834 };
835 use tokio_util::sync::CancellationToken;
836
837 use super::*;
838
839 #[tokio::test]
840 async fn test_watcher() {
841 let cancel = CancellationToken::new();
842 let watchable = Watchable::new(17);
843
844 assert_eq!(watchable.watch().stream().next().await.unwrap(), 17);
845
846 let start = Instant::now();
847 let mut tasks = JoinSet::new();
849 for i in 0..3 {
850 let mut watch = watchable.watch().stream();
851 let cancel = cancel.clone();
852 tasks.spawn(async move {
853 println!("[{i}] spawn");
854 let mut expected_value = 17;
855 loop {
856 tokio::select! {
857 biased;
858 Some(value) = &mut watch.next() => {
859 println!("{:?} [{i}] update: {value}", start.elapsed());
860 assert_eq!(value, expected_value);
861 if expected_value == 17 {
862 expected_value = 0;
863 } else {
864 expected_value += 1;
865 }
866 },
867 _ = cancel.cancelled() => {
868 println!("{:?} [{i}] cancel", start.elapsed());
869 assert_eq!(expected_value, 10);
870 break;
871 }
872 }
873 }
874 });
875 }
876 for i in 0..3 {
877 let mut watch = watchable.watch().stream_updates_only();
878 let cancel = cancel.clone();
879 tasks.spawn(async move {
880 println!("[{i}] spawn");
881 let mut expected_value = 0;
882 loop {
883 tokio::select! {
884 biased;
885 Some(value) = watch.next() => {
886 println!("{:?} [{i}] stream update: {value}", start.elapsed());
887 assert_eq!(value, expected_value);
888 expected_value += 1;
889 },
890 _ = cancel.cancelled() => {
891 println!("{:?} [{i}] cancel", start.elapsed());
892 assert_eq!(expected_value, 10);
893 break;
894 }
895 else => {
896 panic!("stream died");
897 }
898 }
899 }
900 });
901 }
902
903 for next_value in 0..10 {
905 let sleep = Duration::from_nanos(rng().random_range(0..100_000_000));
906 println!("{:?} sleep {sleep:?}", start.elapsed());
907 tokio::time::sleep(sleep).await;
908
909 let changed = watchable.set(next_value);
910 println!("{:?} set {next_value} changed={changed:?}", start.elapsed());
911 }
912
913 println!("cancel");
914 cancel.cancel();
915 while let Some(res) = tasks.join_next().await {
916 res.expect("task failed");
917 }
918 }
919
920 #[test]
921 fn test_get() {
922 let watchable = Watchable::new(None);
923 assert!(watchable.get().is_none());
924
925 watchable.set(Some(1u8)).ok();
926 assert_eq!(watchable.get(), Some(1u8));
927 }
928
929 #[tokio::test]
930 async fn test_initialize() {
931 let watchable = Watchable::new(None);
932
933 let mut watcher = watchable.watch();
934 let mut initialized = watcher.initialized();
935
936 let poll = poll_once(&mut initialized).await;
937 assert!(poll.is_none());
938
939 watchable.set(Some(1u8)).ok();
940
941 let poll = poll_once(&mut initialized).await;
942 assert_eq!(poll.unwrap(), 1u8);
943 }
944
945 #[tokio::test]
946 async fn test_initialize_already_init() {
947 let watchable = Watchable::new(Some(1u8));
948
949 let mut watcher = watchable.watch();
950 let mut initialized = watcher.initialized();
951
952 let poll = poll_once(&mut initialized).await;
953 assert_eq!(poll.unwrap(), 1u8);
954 }
955
956 #[test]
957 fn test_initialized_always_resolves() {
958 #[cfg(not(watcher_loom))]
959 use std::thread;
960
961 #[cfg(watcher_loom)]
962 use loom::thread;
963
964 let test_case = || {
965 let watchable = Watchable::<Option<u8>>::new(None);
966
967 let mut watch = watchable.watch();
968 let thread = thread::spawn(move || n0_future::future::block_on(watch.initialized()));
969
970 watchable.set(Some(42)).ok();
971
972 thread::yield_now();
973
974 let value: u8 = thread.join().unwrap();
975
976 assert_eq!(value, 42);
977 };
978
979 #[cfg(watcher_loom)]
980 loom::model(test_case);
981 #[cfg(not(watcher_loom))]
982 test_case();
983 }
984
985 #[tokio::test(flavor = "multi_thread")]
986 async fn test_update_cancel_safety() {
987 let watchable = Watchable::new(0);
988 let mut watch = watchable.watch();
989 const MAX: usize = 100_000;
990
991 let handle = tokio::spawn(async move {
992 let mut last_observed = 0;
993
994 while last_observed != MAX {
995 tokio::select! {
996 val = watch.updated() => {
997 let Ok(val) = val else {
998 return;
999 };
1000
1001 assert_ne!(val, last_observed, "never observe the same value twice, even with cancellation");
1002 last_observed = val;
1003 }
1004 _ = tokio::time::sleep(Duration::from_micros(rng().random_range(0..10_000))) => {
1005 continue;
1007 }
1008 }
1009 }
1010 });
1011
1012 for i in 1..=MAX {
1013 watchable.set(i).ok();
1014 if rng().random_bool(0.2) {
1015 tokio::task::yield_now().await;
1016 }
1017 }
1018
1019 tokio::time::timeout(Duration::from_secs(10), handle)
1020 .await
1021 .unwrap()
1022 .unwrap()
1023 }
1024
1025 #[tokio::test]
1026 async fn test_join_simple() {
1027 let a = Watchable::new(1u8);
1028 let b = Watchable::new(1u8);
1029
1030 let mut ab = Join::new([a.watch(), b.watch()].into_iter());
1031
1032 let stream = ab.clone().stream();
1033 let handle = tokio::task::spawn(async move { stream.take(5).collect::<Vec<_>>().await });
1034
1035 assert_eq!(ab.get(), vec![1, 1]);
1037 a.set(2u8).unwrap();
1039 tokio::task::yield_now().await;
1040 assert_eq!(ab.get(), vec![2, 1]);
1041 b.set(3u8).unwrap();
1043 tokio::task::yield_now().await;
1044 assert_eq!(ab.get(), vec![2, 3]);
1045
1046 a.set(3u8).unwrap();
1047 tokio::task::yield_now().await;
1048 b.set(4u8).unwrap();
1049 tokio::task::yield_now().await;
1050
1051 let values = tokio::time::timeout(Duration::from_secs(5), handle)
1052 .await
1053 .unwrap()
1054 .unwrap();
1055 assert_eq!(
1056 values,
1057 vec![vec![1, 1], vec![2, 1], vec![2, 3], vec![3, 3], vec![3, 4]]
1058 );
1059 }
1060
1061 #[tokio::test]
1062 async fn test_updated_then_disconnect_then_get() {
1063 let watchable = Watchable::new(10);
1064 let mut watcher = watchable.watch();
1065 assert_eq!(watchable.get(), 10);
1066 watchable.set(42).ok();
1067 assert_eq!(watcher.updated().await.unwrap(), 42);
1068 drop(watchable);
1069 assert_eq!(watcher.get(), 42);
1070 }
1071
1072 #[tokio::test(start_paused = true)]
1073 async fn test_update_wakeup_on_watchable_drop() {
1074 let watchable = Watchable::new(10);
1075 let mut watcher = watchable.watch();
1076
1077 let start = Instant::now();
1078 let (_, result) = tokio::time::timeout(Duration::from_secs(2), async move {
1079 tokio::join!(
1080 async move {
1081 tokio::time::sleep(Duration::from_secs(1)).await;
1082 drop(watchable);
1083 },
1084 async move { watcher.updated().await }
1085 )
1086 })
1087 .await
1088 .expect("watcher never updated");
1089 assert_eq!(start.elapsed(), Duration::from_secs(1));
1092 assert!(result.is_err());
1093 }
1094
1095 #[tokio::test(start_paused = true)]
1096 async fn test_update_wakeup_always_a_change() {
1097 let watchable = Watchable::new(10);
1098 let mut watcher = watchable.watch();
1099
1100 let task = tokio::spawn(async move {
1101 let mut last_value = watcher.get();
1102 let mut values = Vec::new();
1103 while let Ok(value) = watcher.updated().await {
1104 values.push(value);
1105 if last_value == value {
1106 return Err("value duplicated");
1107 }
1108 last_value = value;
1109 }
1110 Ok(values)
1111 });
1112
1113 tokio::time::sleep(Duration::from_millis(100)).await;
1115
1116 watchable.set(11).ok();
1117 tokio::time::sleep(Duration::from_millis(100)).await;
1118 let clone = watchable.clone();
1119 drop(clone); tokio::time::sleep(Duration::from_millis(100)).await;
1121 for i in 1..=10 {
1122 watchable.set(i + 11).ok();
1123 tokio::time::sleep(Duration::from_millis(100)).await;
1124 }
1125 drop(watchable);
1126
1127 let values = task
1128 .await
1129 .expect("task panicked")
1130 .expect("value duplicated");
1131 assert_eq!(values, vec![11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]);
1132 }
1133
1134 #[test]
1135 fn test_has_watchers() {
1136 let a = Watchable::new(1u8);
1137 assert!(!a.has_watchers());
1138 let b = a.clone();
1139 assert!(!a.has_watchers());
1140 assert!(!b.has_watchers());
1141
1142 let watcher = a.watch();
1143 assert!(a.has_watchers());
1144 assert!(b.has_watchers());
1145
1146 drop(watcher);
1147
1148 assert!(!a.has_watchers());
1149 assert!(!b.has_watchers());
1150 }
1151
1152 #[tokio::test]
1153 async fn test_three_watchers_basic() {
1154 let watchable = Watchable::new(1u8);
1155
1156 let mut w1 = watchable.watch();
1157 let mut w2 = watchable.watch();
1158 let mut w3 = watchable.watch();
1159
1160 assert_eq!(w1.get(), 1);
1163 assert_eq!(w2.get(), 1);
1164 assert_eq!(w3.get(), 1);
1165
1166 watchable.set(42).unwrap();
1168
1169 assert_eq!(w1.updated().await.unwrap(), 42);
1171 assert_eq!(w2.updated().await.unwrap(), 42);
1172 assert_eq!(w3.updated().await.unwrap(), 42);
1173 }
1174
1175 #[tokio::test]
1176 async fn test_three_watchers_skip_intermediate() {
1177 let watchable = Watchable::new(0u8);
1178 let mut watcher = watchable.watch();
1179
1180 watchable.set(1).ok();
1181 watchable.set(2).ok();
1182 watchable.set(3).ok();
1183 watchable.set(4).ok();
1184
1185 let value = watcher.updated().await.unwrap();
1186
1187 assert_eq!(value, 4);
1188 }
1189
1190 #[tokio::test]
1191 async fn test_three_watchers_with_streams() {
1192 let watchable = Watchable::new(10u8);
1193
1194 let mut stream1 = watchable.watch().stream();
1195 let mut stream2 = watchable.watch().stream();
1196 let mut stream3 = watchable.watch().stream_updates_only();
1197
1198 assert_eq!(stream1.next().await.unwrap(), 10);
1199 assert_eq!(stream2.next().await.unwrap(), 10);
1200
1201 watchable.set(20).ok();
1203
1204 assert_eq!(stream1.next().await.unwrap(), 20);
1206 assert_eq!(stream2.next().await.unwrap(), 20);
1207 assert_eq!(stream3.next().await.unwrap(), 20);
1208 }
1209
1210 #[tokio::test]
1211 async fn test_three_watchers_independent() {
1212 let watchable = Watchable::new(0u8);
1213
1214 let mut fast_watcher = watchable.watch();
1215 let mut slow_watcher = watchable.watch();
1216 let mut lazy_watcher = watchable.watch();
1217
1218 watchable.set(1).ok();
1219 assert_eq!(fast_watcher.updated().await.unwrap(), 1);
1220
1221 watchable.set(2).ok();
1223 watchable.set(3).ok();
1224
1225 assert_eq!(slow_watcher.updated().await.unwrap(), 3);
1226 assert_eq!(lazy_watcher.get(), 3);
1227 }
1228
1229 #[tokio::test]
1230 async fn test_combine_three_watchers() {
1231 let a = Watchable::new(1u8);
1232 let b = Watchable::new(2u8);
1233 let c = Watchable::new(3u8);
1234
1235 let mut combined = Triple::new(a.watch(), b.watch(), c.watch());
1236
1237 assert_eq!(combined.get(), (1, 2, 3));
1238
1239 b.set(20).ok();
1241
1242 assert_eq!(combined.updated().await.unwrap(), (1, 20, 3));
1243
1244 c.set(30).ok();
1245 assert_eq!(combined.updated().await.unwrap(), (1, 20, 30));
1246 }
1247
1248 #[tokio::test]
1249 async fn test_three_watchers_disconnection() {
1250 let watchable = Watchable::new(5u8);
1251
1252 let mut w1 = watchable.watch();
1254 let mut w2 = watchable.watch();
1255 let mut w3 = watchable.watch();
1256
1257 drop(watchable);
1259
1260 assert!(!w1.is_connected());
1262 assert!(!w2.is_connected());
1263 assert!(!w3.is_connected());
1264
1265 assert_eq!(w1.get(), 5);
1267 assert_eq!(w2.get(), 5);
1268
1269 assert!(w3.updated().await.is_err());
1271 }
1272
1273 #[tokio::test]
1274 async fn test_three_watchers_truly_concurrent() {
1275 use tokio::time::sleep;
1276 let watchable = Watchable::new(0u8);
1277
1278 let mut reader_handles = vec![];
1280 for i in 0..3 {
1281 let mut watcher = watchable.watch();
1282 let handle = tokio::spawn(async move {
1283 let mut values = vec![];
1284 for _ in 0..5 {
1286 if let Ok(value) = watcher.updated().await {
1287 values.push(value);
1288 } else {
1289 break;
1290 }
1291 }
1292 (i, values)
1293 });
1294 reader_handles.push(handle);
1295 }
1296
1297 let mut writer_handles = vec![];
1299 for i in 0..3 {
1300 let watchable_clone = watchable.clone();
1301 let handle = tokio::spawn(async move {
1302 for j in 0..5 {
1303 let value = (i * 10) + j;
1304 watchable_clone.set(value).ok();
1305 sleep(Duration::from_millis(5)).await;
1306 }
1307 });
1308 writer_handles.push(handle);
1309 }
1310
1311 for handle in writer_handles {
1313 handle.await.unwrap();
1314 }
1315
1316 for handle in reader_handles {
1318 let (task_id, values) = handle.await.unwrap();
1319 println!("Reader {}: saw values {:?}", task_id, values);
1320 assert!(!values.is_empty());
1321 }
1322 }
1323
1324 #[tokio::test]
1325 async fn test_peek() {
1326 let a = Watchable::new(vec![1, 2, 3]);
1327 let mut wa = a.watch();
1328
1329 assert_eq!(wa.get(), vec![1, 2, 3]);
1330 assert_eq!(wa.peek(), &vec![1, 2, 3]);
1331
1332 let mut wa_map = wa.map(|a| a.into_iter().map(|a| a * 2).collect::<Vec<_>>());
1333
1334 assert_eq!(wa_map.get(), vec![2, 4, 6]);
1335 assert_eq!(wa_map.peek(), &vec![2, 4, 6]);
1336
1337 let mut wb = a.watch();
1338
1339 assert_eq!(wb.get(), vec![1, 2, 3]);
1340 assert_eq!(wb.peek(), &vec![1, 2, 3]);
1341
1342 let mut wb_map = wb.map(|a| a.into_iter().map(|a| a * 2).collect::<Vec<_>>());
1343
1344 assert_eq!(wb_map.get(), vec![2, 4, 6]);
1345 assert_eq!(wb_map.peek(), &vec![2, 4, 6]);
1346
1347 let mut w_join = Join::new([wa_map, wb_map].into_iter());
1348
1349 assert_eq!(w_join.get(), vec![vec![2, 4, 6], vec![2, 4, 6]]);
1350 assert_eq!(w_join.peek(), &vec![vec![2, 4, 6], vec![2, 4, 6]]);
1351 }
1352
1353 #[tokio::test]
1354 async fn test_update_updates_peek() {
1355 let value = Watchable::new(42);
1356 let mut watcher = value.watch();
1357
1358 assert_eq!(watcher.peek(), &42);
1359 assert!(!watcher.update());
1360
1361 value.set(50).ok();
1362
1363 assert_eq!(watcher.peek(), &42); assert!(watcher.update()); assert_eq!(watcher.peek(), &50);
1366 assert!(!watcher.update());
1367
1368 let mut watcher_map = watcher.clone().map(|v| v * 2);
1369
1370 assert_eq!(watcher_map.peek(), &100);
1371 assert!(!watcher_map.update());
1372
1373 value.set(10).ok();
1374
1375 assert_eq!(watcher_map.peek(), &100);
1376 assert!(watcher_map.update());
1377 assert_eq!(watcher_map.peek(), &20);
1378 assert!(!watcher_map.update());
1379
1380 let value2 = Watchable::new(0);
1381 let mut watcher_join = Join::new([watcher, value2.watch()].into_iter());
1382
1383 assert_eq!(watcher_join.peek(), &vec![10, 0]);
1384 assert!(!watcher_join.update());
1385
1386 value.set(0).ok();
1387 value2.set(1).ok();
1388
1389 assert_eq!(watcher_join.peek(), &vec![10, 0]);
1390 assert!(watcher_join.update());
1391 assert_eq!(watcher_join.peek(), &vec![0, 1]);
1392 assert!(!watcher_join.update());
1393 }
1394
1395 #[tokio::test]
1396 async fn test_get_updates_peek() {
1397 let value = Watchable::new(42);
1398 let mut watcher = value.watch();
1399
1400 assert_eq!(watcher.peek(), &42);
1401 assert!(!watcher.update());
1402
1403 value.set(50).ok();
1404
1405 assert_eq!(watcher.peek(), &42); assert_eq!(watcher.get(), 50); assert_eq!(watcher.peek(), &50);
1408 assert!(!watcher.update());
1409
1410 let mut watcher_map = watcher.clone().map(|v| v * 2);
1411
1412 assert_eq!(watcher_map.peek(), &100);
1413 assert!(!watcher_map.update());
1414
1415 value.set(10).ok();
1416
1417 assert_eq!(watcher_map.peek(), &100);
1418 assert_eq!(watcher_map.get(), 20);
1419 assert_eq!(watcher_map.peek(), &20);
1420 assert!(!watcher_map.update());
1421
1422 let value2 = Watchable::new(0);
1423 let mut watcher_join = Join::new([watcher, value2.watch()].into_iter());
1424
1425 assert_eq!(watcher_join.peek(), &vec![10, 0]);
1426 assert!(!watcher_join.update());
1427
1428 value.set(0).ok();
1429 value2.set(1).ok();
1430
1431 assert_eq!(watcher_join.peek(), &vec![10, 0]);
1432 assert_eq!(watcher_join.get(), vec![0, 1]);
1433 assert_eq!(watcher_join.peek(), &vec![0, 1]);
1434 assert!(!watcher_join.update());
1435 }
1436}