Expand description
Watchable values.
A Watchable
exists to keep track of a value which may change over time. It allows
observers to be notified of changes to the value. The aim is to always be aware of the
last value, not to observe every value change.
The reason for this is ergonomics and predictable resource usage: Requiring every
intermediate value to be observable would mean that either the side that sets new values
using Watchable::set
would need to wait for all “receivers” of these intermediate
values to catch up and thus be an async operation, or it would require the receivers
to buffer intermediate values until they’ve been “received” on the Watcher
s with
an unlimited buffer size and thus potentially unlimited memory growth.
§Example
use n0_future::StreamExt;
use n0_watcher::{Watchable, Watcher as _};
#[tokio::main(flavor = "current_thread", start_paused = true)]
async fn main() {
let watchable = Watchable::new(None);
// A task that waits for the watcher to be initialized to Some(value) before printing it
let mut watcher = watchable.watch();
tokio::spawn(async move {
let initialized_value = watcher.initialized().await;
println!("initialized: {initialized_value}");
});
// A task that prints every update to the watcher since the initial one:
let mut updates = watchable.watch().stream_updates_only();
tokio::spawn(async move {
while let Some(update) = updates.next().await {
println!("update: {update:?}");
}
});
// A task that prints the current value and then every update it can catch,
// but it also does something else which makes it very slow to pick up new
// values, so it'll skip some:
let mut current_and_updates = watchable.watch().stream();
tokio::spawn(async move {
while let Some(update) = current_and_updates.next().await {
println!("update2: {update:?}");
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
});
for i in 0..20 {
println!("Setting watchable to {i}");
watchable.set(Some(i)).ok();
tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
}
}
§Similar but different
async_channel
: This is a multi-producer, multi-consumer channel implementation. Only at most one consumer will receive each “produced” value. What we want is to have every “produced” value to be “broadcast” to every receiver.tokio::broadcast
: Also a multi-producer, multi-consumer channel implementation. This is very similar to this crate (tokio::broadcast::Sender
is likeWatchable
andtokio::broadcast::Receiver
is likeWatcher
), but you can’t get the latest value without.await
ing on the receiver, and it’ll internally store a queue of intermediate values.tokio::watch
: Also a MPSC channel, and unliketokio::broadcast
only retains the latest value. That module has pretty much the same purpose as this crate, but doesn’t implement a poll-based method of getting updates and doesn’t implement combinators.std::sync::RwLock
: (wrapped in anstd::sync::Arc
) This allows you access to the latest values, but might block while it’s being set (but that could be short enough not to matter for async rust purposes). This doesn’t allow you to be notified whenever a new value is written.- The
watchable
crate: We used to use this crate at n0, but we wanted to experiment with different APIs and needed Wasm support.
Structs§
- The immediate, direct observer of a
Watchable
value. - Wraps a
Watcher
to allow observing a derived value. - Combinator to join two watchers
- Wraps a
Watcher
to allow observing a derived value. - Future returning the next item after the current one in a
Watcher
. - A stream for a
Watcher
’s next values. - A wrapper around a value that notifies
Watcher
s when the value is modified.
Traits§
- Abstracts over
Option<T>
andVec<T>
- A handle to a value that’s represented by one or more underlying
Watchable
s.