Crate n0_watcher

Source
Expand description

Watchable values.

A Watchable exists to keep track of a value which may change over time. It allows observers to be notified of changes to the value. The aim is to always be aware of the last value, not to observe every value change.

The reason for this is ergonomics and predictable resource usage: Requiring every intermediate value to be observable would mean that either the side that sets new values using Watchable::set would need to wait for all “receivers” of these intermediate values to catch up and thus be an async operation, or it would require the receivers to buffer intermediate values until they’ve been “received” on the Watchers with an unlimited buffer size and thus potentially unlimited memory growth.

§Example

use n0_future::StreamExt;
use n0_watcher::{Watchable, Watcher as _};

#[tokio::main(flavor = "current_thread", start_paused = true)]
async fn main() {
    let watchable = Watchable::new(None);

    // A task that waits for the watcher to be initialized to Some(value) before printing it
    let mut watcher = watchable.watch();
    tokio::spawn(async move {
        let initialized_value = watcher.initialized().await;
        println!("initialized: {initialized_value}");
    });

    // A task that prints every update to the watcher since the initial one:
    let mut updates = watchable.watch().stream_updates_only();
    tokio::spawn(async move {
        while let Some(update) = updates.next().await {
            println!("update: {update:?}");
        }
    });

    // A task that prints the current value and then every update it can catch,
    // but it also does something else which makes it very slow to pick up new
    // values, so it'll skip some:
    let mut current_and_updates = watchable.watch().stream();
    tokio::spawn(async move {
        while let Some(update) = current_and_updates.next().await {
            println!("update2: {update:?}");
            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        }
    });

    for i in 0..20 {
        println!("Setting watchable to {i}");
        watchable.set(Some(i)).ok();
        tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
    }
}

§Similar but different

  • async_channel: This is a multi-producer, multi-consumer channel implementation. Only at most one consumer will receive each “produced” value. What we want is to have every “produced” value to be “broadcast” to every receiver.
  • tokio::broadcast: Also a multi-producer, multi-consumer channel implementation. This is very similar to this crate (tokio::broadcast::Sender is like Watchable and tokio::broadcast::Receiver is like Watcher), but you can’t get the latest value without .awaiting on the receiver, and it’ll internally store a queue of intermediate values.
  • tokio::watch: Also a MPSC channel, and unlike tokio::broadcast only retains the latest value. That module has pretty much the same purpose as this crate, but doesn’t implement a poll-based method of getting updates and doesn’t implement combinators.
  • std::sync::RwLock: (wrapped in an std::sync::Arc) This allows you access to the latest values, but might block while it’s being set (but that could be short enough not to matter for async rust purposes). This doesn’t allow you to be notified whenever a new value is written.
  • The watchable crate: We used to use this crate at n0, but we wanted to experiment with different APIs and needed Wasm support.

Structs§

Traits§

  • Abstracts over Option<T> and Vec<T>
  • A handle to a value that’s represented by one or more underlying Watchables.