iroh_blobs/store/fs/util.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
use std::future::Future;
use tokio::{select, sync::mpsc};
/// A wrapper for a tokio mpsc receiver that allows peeking at the next message.
#[derive(Debug)]
pub struct PeekableReceiver<T> {
msg: Option<T>,
recv: mpsc::Receiver<T>,
}
#[allow(dead_code)]
impl<T> PeekableReceiver<T> {
pub fn new(recv: mpsc::Receiver<T>) -> Self {
Self { msg: None, recv }
}
/// Receive the next message.
///
/// Will block if there are no messages.
/// Returns None only if there are no more messages (sender is dropped).
///
/// Cancel safe because the only async operation is the recv() call, which is cancel safe.
pub async fn recv(&mut self) -> Option<T> {
if let Some(msg) = self.msg.take() {
return Some(msg);
}
self.recv.recv().await
}
/// Receive the next message, but only if it passes the filter.
///
/// Cancel safe because the only async operation is the [Self::recv] call, which is cancel safe.
pub async fn extract<U>(
&mut self,
f: impl Fn(T) -> std::result::Result<U, T>,
timeout: impl Future + Unpin,
) -> Option<U> {
let msg = select! {
x = self.recv() => x?,
_ = timeout => return None,
};
match f(msg) {
Ok(u) => Some(u),
Err(msg) => {
self.msg = Some(msg);
None
}
}
}
/// Push back a message. This will only work if there is room for it.
/// Otherwise, it will fail and return the message.
pub fn push_back(&mut self, msg: T) -> std::result::Result<(), T> {
if self.msg.is_none() {
self.msg = Some(msg);
Ok(())
} else {
Err(msg)
}
}
}