Expand description
Watchable values.
A Watchable exists to keep track of a value which may change over time. It allows
observers to be notified of changes to the value. The aim is to always be aware of the
last value, not to observe every value change.
The reason for this is ergonomics and predictable resource usage: Requiring every
intermediate value to be observable would mean that either the side that sets new values
using Watchable::set would need to wait for all “receivers” of these intermediate
values to catch up and thus be an async operation, or it would require the receivers
to buffer intermediate values until they’ve been “received” on the Watchers with
an unlimited buffer size and thus potentially unlimited memory growth.
§Example
use n0_future::StreamExt;
use n0_watcher::{Watchable, Watcher as _};
#[tokio::main(flavor = "current_thread", start_paused = true)]
async fn main() {
let watchable = Watchable::new(None);
// A task that waits for the watcher to be initialized to Some(value) before printing it
let mut watcher = watchable.watch();
tokio::spawn(async move {
let initialized_value = watcher.initialized().await;
println!("initialized: {initialized_value}");
});
// A task that prints every update to the watcher since the initial one:
let mut updates = watchable.watch().stream_updates_only();
tokio::spawn(async move {
while let Some(update) = updates.next().await {
println!("update: {update:?}");
}
});
// A task that prints the current value and then every update it can catch,
// but it also does something else which makes it very slow to pick up new
// values, so it'll skip some:
let mut current_and_updates = watchable.watch().stream();
tokio::spawn(async move {
while let Some(update) = current_and_updates.next().await {
println!("update2: {update:?}");
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
});
for i in 0..20 {
println!("Setting watchable to {i}");
watchable.set(Some(i)).ok();
tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
}
}§Similar but different
async_channel: This is a multi-producer, multi-consumer channel implementation. Only at most one consumer will receive each “produced” value. What we want is to have every “produced” value to be “broadcast” to every receiver.tokio::broadcast: Also a multi-producer, multi-consumer channel implementation. This is very similar to this crate (tokio::broadcast::Senderis likeWatchableandtokio::broadcast::Receiveris likeWatcher), but you can’t get the latest value without.awaiting on the receiver, and it’ll internally store a queue of intermediate values.tokio::watch: Also a MPSC channel, and unliketokio::broadcastonly retains the latest value. That module has pretty much the same purpose as this crate, but doesn’t implement a poll-based method of getting updates and doesn’t implement combinators.std::sync::RwLock: (wrapped in anstd::sync::Arc) This allows you access to the latest values, but might block while it’s being set (but that could be short enough not to matter for async rust purposes). This doesn’t allow you to be notified whenever a new value is written.- The
watchablecrate: We used to use this crate at n0, but we wanted to experiment with different APIs and needed Wasm support.
Structs§
- Direct
- The immediate, direct observer of a
Watchablevalue. - Disconnected
- The error for when a
Watcheris disconnected from its underlyingWatchablevalue, because of that watchable having been dropped. - Initialized
Fut - Future returning the current or next value that’s
Somevalue. in aWatcher. - Join
- Combinator to join two watchers
- Lazy
Direct - A lazy direct observer of a
Watchablevalue. - Map
- Wraps a
Watcherto allow observing a derived value. - NextFut
- Future returning the next item after the current one in a
Watcher. - Stream
- A stream for a
Watcher’s next values. - Watchable
- A wrapper around a value that notifies
Watchers when the value is modified. - Weak
Watcher - A weak reference to a watchable value that can be upgraded to a full watcher later.