use std::{
    collections::{BTreeMap, HashMap, HashSet},
    future::Future,
    io::{self, Write},
    num::NonZeroU64,
    ops::Deref,
    sync::Arc,
    time::SystemTime,
};
use bao_tree::{
    blake3,
    io::{
        mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt},
        outboard::PreOrderMemOutboard,
        sync::{Outboard, ReadAt, WriteAt},
        BaoContentItem, EncodeError, Leaf,
    },
    BaoTree, ChunkNum, ChunkRanges, TreeNode,
};
use bytes::Bytes;
use irpc::channel::mpsc;
use n0_future::future::yield_now;
use range_collections::range_set::RangeSetRange;
use tokio::{
    io::AsyncReadExt,
    sync::watch,
    task::{JoinError, JoinSet},
};
use tracing::{error, info, instrument, trace, Instrument};
use super::util::{BaoTreeSender, PartialMemStorage};
use crate::{
    api::{
        self,
        blobs::{AddProgressItem, Bitfield, BlobStatus, ExportProgressItem},
        proto::{
            BatchMsg, BatchResponse, BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, Command,
            CreateTagMsg, CreateTagRequest, CreateTempTagMsg, DeleteBlobsMsg, DeleteTagsMsg,
            DeleteTagsRequest, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest,
            ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportBaoRequest,
            ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
            ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
            ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
            SetTagRequest, ShutdownMsg, SyncDbMsg,
        },
        tags::TagInfo,
        ApiClient,
    },
    store::{
        util::{SizeInfo, SparseMemFile, Tag},
        HashAndFormat, IROH_BLOCK_SIZE,
    },
    util::{
        temp_tag::{TagDrop, TempTagScope, TempTags},
        ChunkRangesExt,
    },
    BlobFormat, Hash,
};
#[derive(Debug, Default)]
pub struct Options {}
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct MemStore {
    client: ApiClient,
}
impl AsRef<crate::api::Store> for MemStore {
    fn as_ref(&self) -> &crate::api::Store {
        crate::api::Store::ref_from_sender(&self.client)
    }
}
impl Deref for MemStore {
    type Target = crate::api::Store;
    fn deref(&self) -> &Self::Target {
        crate::api::Store::ref_from_sender(&self.client)
    }
}
impl Default for MemStore {
    fn default() -> Self {
        Self::new()
    }
}
#[derive(derive_more::From)]
enum TaskResult {
    Unit(()),
    Import(anyhow::Result<ImportEntry>),
    Scope(Scope),
}
impl MemStore {
    pub fn from_sender(client: ApiClient) -> Self {
        Self { client }
    }
    pub fn new() -> Self {
        let (sender, receiver) = tokio::sync::mpsc::channel(32);
        tokio::spawn(
            Actor {
                commands: receiver,
                tasks: JoinSet::new(),
                state: State {
                    data: HashMap::new(),
                    tags: BTreeMap::new(),
                    empty_hash: BaoFileHandle::new_partial(Hash::EMPTY),
                },
                options: Arc::new(Options::default()),
                temp_tags: Default::default(),
                protected: Default::default(),
            }
            .run(),
        );
        Self::from_sender(sender.into())
    }
}
struct Actor {
    commands: tokio::sync::mpsc::Receiver<Command>,
    tasks: JoinSet<TaskResult>,
    state: State,
    #[allow(dead_code)]
    options: Arc<Options>,
    temp_tags: TempTags,
    protected: HashSet<Hash>,
}
impl Actor {
    fn spawn<F, T>(&mut self, f: F)
    where
        F: Future<Output = T> + Send + 'static,
        T: Into<TaskResult>,
    {
        let span = tracing::Span::current();
        let fut = async move { f.await.into() }.instrument(span);
        self.tasks.spawn(fut);
    }
    async fn handle_command(&mut self, cmd: Command) -> Option<ShutdownMsg> {
        match cmd {
            Command::ImportBao(ImportBaoMsg {
                inner: ImportBaoRequest { hash, size },
                rx: data,
                tx,
                ..
            }) => {
                let entry = self.get_or_create_entry(hash);
                self.spawn(import_bao(entry, size, data, tx));
            }
            Command::Observe(ObserveMsg {
                inner: ObserveRequest { hash },
                tx,
                ..
            }) => {
                let entry = self.get_or_create_entry(hash);
                self.spawn(observe(entry, tx));
            }
            Command::ImportBytes(ImportBytesMsg {
                inner:
                    ImportBytesRequest {
                        data,
                        scope,
                        format,
                        ..
                    },
                tx,
                ..
            }) => {
                self.spawn(import_bytes(data, scope, format, tx));
            }
            Command::ImportByteStream(ImportByteStreamMsg { inner, tx, rx, .. }) => {
                self.spawn(import_byte_stream(inner.scope, inner.format, rx, tx));
            }
            Command::ImportPath(cmd) => {
                self.spawn(import_path(cmd));
            }
            Command::ExportBao(ExportBaoMsg {
                inner: ExportBaoRequest { hash, ranges },
                tx,
                ..
            }) => {
                let entry = self.get(&hash);
                self.spawn(export_bao(entry, ranges, tx))
            }
            Command::ExportPath(cmd) => {
                let entry = self.get(&cmd.hash);
                self.spawn(export_path(entry, cmd));
            }
            Command::DeleteTags(cmd) => {
                let DeleteTagsMsg {
                    inner: DeleteTagsRequest { from, to },
                    tx,
                    ..
                } = cmd;
                info!("deleting tags from {:?} to {:?}", from, to);
                self.state.tags.retain(|tag, _| {
                    if let Some(from) = &from {
                        if tag < from {
                            return true;
                        }
                    }
                    if let Some(to) = &to {
                        if tag >= to {
                            return true;
                        }
                    }
                    info!("    removing {:?}", tag);
                    false
                });
                tx.send(Ok(())).await.ok();
            }
            Command::RenameTag(cmd) => {
                let RenameTagMsg {
                    inner: RenameTagRequest { from, to },
                    tx,
                    ..
                } = cmd;
                let tags = &mut self.state.tags;
                let value = match tags.remove(&from) {
                    Some(value) => value,
                    None => {
                        tx.send(Err(api::Error::io(
                            io::ErrorKind::NotFound,
                            format!("tag not found: {from:?}"),
                        )))
                        .await
                        .ok();
                        return None;
                    }
                };
                tags.insert(to, value);
                tx.send(Ok(())).await.ok();
                return None;
            }
            Command::ListTags(cmd) => {
                let ListTagsMsg {
                    inner:
                        ListTagsRequest {
                            from,
                            to,
                            raw,
                            hash_seq,
                        },
                    tx,
                    ..
                } = cmd;
                let tags = self
                    .state
                    .tags
                    .iter()
                    .filter(move |(tag, value)| {
                        if let Some(from) = &from {
                            if tag < &from {
                                return false;
                            }
                        }
                        if let Some(to) = &to {
                            if tag >= &to {
                                return false;
                            }
                        }
                        raw && value.format.is_raw() || hash_seq && value.format.is_hash_seq()
                    })
                    .map(|(tag, value)| TagInfo {
                        name: tag.clone(),
                        hash: value.hash,
                        format: value.format,
                    })
                    .map(Ok);
                tx.send(tags.collect()).await.ok();
            }
            Command::SetTag(SetTagMsg {
                inner: SetTagRequest { name: tag, value },
                tx,
                ..
            }) => {
                self.state.tags.insert(tag, value);
                tx.send(Ok(())).await.ok();
            }
            Command::CreateTag(CreateTagMsg {
                inner: CreateTagRequest { value },
                tx,
                ..
            }) => {
                let tag = Tag::auto(SystemTime::now(), |tag| self.state.tags.contains_key(tag));
                self.state.tags.insert(tag.clone(), value);
                tx.send(Ok(tag)).await.ok();
            }
            Command::CreateTempTag(cmd) => {
                trace!("{cmd:?}");
                self.create_temp_tag(cmd).await;
            }
            Command::ListTempTags(cmd) => {
                trace!("{cmd:?}");
                let tts = self.temp_tags.list();
                cmd.tx.send(tts).await.ok();
            }
            Command::ListBlobs(cmd) => {
                let ListBlobsMsg { tx, .. } = cmd;
                let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
                self.spawn(async move {
                    for blob in blobs {
                        if tx.send(Ok(blob)).await.is_err() {
                            break;
                        }
                    }
                });
            }
            Command::BlobStatus(cmd) => {
                trace!("{cmd:?}");
                let BlobStatusMsg {
                    inner: BlobStatusRequest { hash },
                    tx,
                    ..
                } = cmd;
                let res = match self.get(&hash) {
                    None => api::blobs::BlobStatus::NotFound,
                    Some(x) => {
                        let bitfield = x.0.state.borrow().bitfield();
                        if bitfield.is_complete() {
                            BlobStatus::Complete {
                                size: bitfield.size,
                            }
                        } else {
                            BlobStatus::Partial {
                                size: bitfield.validated_size(),
                            }
                        }
                    }
                };
                tx.send(res).await.ok();
            }
            Command::DeleteBlobs(cmd) => {
                trace!("{cmd:?}");
                let DeleteBlobsMsg {
                    inner: BlobDeleteRequest { hashes, force },
                    tx,
                    ..
                } = cmd;
                for hash in hashes {
                    if !force && self.protected.contains(&hash) {
                        continue;
                    }
                    self.state.data.remove(&hash);
                }
                tx.send(Ok(())).await.ok();
            }
            Command::Batch(cmd) => {
                trace!("{cmd:?}");
                let (id, scope) = self.temp_tags.create_scope();
                self.spawn(handle_batch(cmd, id, scope));
            }
            Command::ClearProtected(cmd) => {
                self.protected.clear();
                cmd.tx.send(Ok(())).await.ok();
            }
            Command::ExportRanges(cmd) => {
                let entry = self.get(&cmd.hash);
                self.spawn(export_ranges(cmd, entry));
            }
            Command::SyncDb(SyncDbMsg { tx, .. }) => {
                tx.send(Ok(())).await.ok();
            }
            Command::Shutdown(cmd) => {
                return Some(cmd);
            }
        }
        None
    }
    fn get(&mut self, hash: &Hash) -> Option<BaoFileHandle> {
        if *hash == Hash::EMPTY {
            Some(self.state.empty_hash.clone())
        } else {
            self.state.data.get(hash).cloned()
        }
    }
    fn get_or_create_entry(&mut self, hash: Hash) -> BaoFileHandle {
        if hash == Hash::EMPTY {
            self.state.empty_hash.clone()
        } else {
            self.state
                .data
                .entry(hash)
                .or_insert_with(|| BaoFileHandle::new_partial(hash))
                .clone()
        }
    }
    async fn create_temp_tag(&mut self, cmd: CreateTempTagMsg) {
        let CreateTempTagMsg { tx, inner, .. } = cmd;
        let mut tt = self.temp_tags.create(inner.scope, inner.value);
        if tx.is_rpc() {
            tt.leak();
        }
        tx.send(tt).await.ok();
    }
    async fn finish_import(&mut self, res: anyhow::Result<ImportEntry>) {
        let import_data = match res {
            Ok(entry) => entry,
            Err(e) => {
                error!("import failed: {e}");
                return;
            }
        };
        let hash = import_data.outboard.root().into();
        let entry = self.get_or_create_entry(hash);
        entry
            .0
            .state
            .send_if_modified(|state: &mut BaoFileStorage| {
                let BaoFileStorage::Partial(_) = state.deref() else {
                    return false;
                };
                *state =
                    CompleteStorage::new(import_data.data, import_data.outboard.data.into()).into();
                true
            });
        let tt = self.temp_tags.create(
            import_data.scope,
            HashAndFormat {
                hash,
                format: import_data.format,
            },
        );
        import_data.tx.send(AddProgressItem::Done(tt)).await.ok();
    }
    fn log_task_result(&self, res: Result<TaskResult, JoinError>) -> Option<TaskResult> {
        match res {
            Ok(x) => Some(x),
            Err(e) => {
                if e.is_cancelled() {
                    trace!("task cancelled: {e}");
                } else {
                    error!("task failed: {e}");
                }
                None
            }
        }
    }
    pub async fn run(mut self) {
        let shutdown = loop {
            tokio::select! {
                cmd = self.commands.recv() => {
                    let Some(cmd) = cmd else {
                        break None;
                    };
                    if let Some(cmd) = self.handle_command(cmd).await {
                        break Some(cmd);
                    }
                }
                Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
                    let Some(res) = self.log_task_result(res) else {
                        continue;
                    };
                    match res {
                        TaskResult::Import(res) => {
                            self.finish_import(res).await;
                        }
                        TaskResult::Scope(scope) => {
                            self.temp_tags.end_scope(scope);
                        }
                        TaskResult::Unit(_) => {}
                    }
                }
            }
        };
        if let Some(shutdown) = shutdown {
            shutdown.tx.send(()).await.ok();
        }
    }
}
async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>) -> Scope {
    if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
        error!("batch failed: {cause}");
    }
    id
}
async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>) -> api::Result<()> {
    let BatchMsg { tx, mut rx, .. } = cmd;
    trace!("created scope {}", id);
    tx.send(id).await.map_err(api::Error::other)?;
    while let Some(msg) = rx.recv().await? {
        match msg {
            BatchResponse::Drop(msg) => scope.on_drop(&msg),
            BatchResponse::Ping => {}
        }
    }
    Ok(())
}
async fn export_ranges(mut cmd: ExportRangesMsg, entry: Option<BaoFileHandle>) {
    let Some(entry) = entry else {
        let err = io::Error::new(io::ErrorKind::NotFound, "hash not found");
        cmd.tx.send(ExportRangesItem::Error(err.into())).await.ok();
        return;
    };
    if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, entry).await {
        cmd.tx
            .send(ExportRangesItem::Error(cause.into()))
            .await
            .ok();
    }
}
async fn export_ranges_impl(
    cmd: ExportRangesRequest,
    tx: &mut mpsc::Sender<ExportRangesItem>,
    entry: BaoFileHandle,
) -> io::Result<()> {
    let ExportRangesRequest { ranges, hash } = cmd;
    let bitfield = entry.bitfield();
    trace!(
        "exporting ranges: {hash} {ranges:?} size={}",
        bitfield.size()
    );
    debug_assert!(entry.hash() == hash, "hash mismatch");
    let data = entry.data_reader();
    let size = bitfield.size();
    for range in ranges.iter() {
        let range = match range {
            RangeSetRange::Range(range) => size.min(*range.start)..size.min(*range.end),
            RangeSetRange::RangeFrom(range) => size.min(*range.start)..size,
        };
        let requested = ChunkRanges::bytes(range.start..range.end);
        if !bitfield.ranges.is_superset(&requested) {
            return Err(io::Error::other(format!(
                "missing range: {requested:?}, present: {bitfield:?}",
            )));
        }
        let bs = 1024;
        let mut offset = range.start;
        loop {
            let end: u64 = (offset + bs).min(range.end);
            let size = (end - offset) as usize;
            tx.send(
                Leaf {
                    offset,
                    data: data.read_bytes_at(offset, size)?,
                }
                .into(),
            )
            .await?;
            offset = end;
            if offset >= range.end {
                break;
            }
        }
    }
    Ok(())
}
fn chunk_range(leaf: &Leaf) -> ChunkRanges {
    let start = ChunkNum::chunks(leaf.offset);
    let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64);
    (start..end).into()
}
async fn import_bao(
    entry: BaoFileHandle,
    size: NonZeroU64,
    mut stream: mpsc::Receiver<BaoContentItem>,
    tx: irpc::channel::oneshot::Sender<api::Result<()>>,
) {
    let size = size.get();
    entry
        .0
        .state
        .send_if_modified(|state: &mut BaoFileStorage| {
            let BaoFileStorage::Partial(entry) = state else {
                return false;
            };
            entry.size.write(0, size);
            false
        });
    let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
    while let Some(item) = stream.recv().await.unwrap() {
        entry.0.state.send_if_modified(|state| {
            let BaoFileStorage::Partial(partial) = state else {
                return false;
            };
            match item {
                BaoContentItem::Parent(parent) => {
                    if let Some(offset) = tree.pre_order_offset(parent.node) {
                        let mut pair = [0u8; 64];
                        pair[..32].copy_from_slice(parent.pair.0.as_bytes());
                        pair[32..].copy_from_slice(parent.pair.1.as_bytes());
                        partial
                            .outboard
                            .write_at(offset * 64, &pair)
                            .expect("writing to mem can never fail");
                    }
                    false
                }
                BaoContentItem::Leaf(leaf) => {
                    let start = leaf.offset;
                    partial
                        .data
                        .write_at(start, &leaf.data)
                        .expect("writing to mem can never fail");
                    let added = chunk_range(&leaf);
                    let update = partial.bitfield.update(&Bitfield::new(added.clone(), size));
                    if update.new_state().complete {
                        let data = std::mem::take(&mut partial.data);
                        let outboard = std::mem::take(&mut partial.outboard);
                        let data: Bytes = <Vec<u8>>::try_from(data).unwrap().into();
                        let outboard: Bytes = <Vec<u8>>::try_from(outboard).unwrap().into();
                        *state = CompleteStorage::new(data, outboard).into();
                    }
                    update.changed()
                }
            }
        });
    }
    tx.send(Ok(())).await.ok();
}
#[instrument(skip_all, fields(hash = tracing::field::Empty))]
async fn export_bao(
    entry: Option<BaoFileHandle>,
    ranges: ChunkRanges,
    mut sender: mpsc::Sender<EncodedItem>,
) {
    let Some(entry) = entry else {
        let err = EncodeError::Io(io::Error::new(io::ErrorKind::NotFound, "hash not found"));
        sender.send(err.into()).await.ok();
        return;
    };
    tracing::Span::current().record("hash", tracing::field::display(entry.hash));
    let data = entry.data_reader();
    let outboard = entry.outboard_reader();
    let tx = BaoTreeSender::new(&mut sender);
    traverse_ranges_validated(data, outboard, &ranges, tx)
        .await
        .ok();
}
#[instrument(skip_all, fields(hash = %entry.hash.fmt_short()))]
async fn observe(entry: BaoFileHandle, tx: mpsc::Sender<api::blobs::Bitfield>) {
    entry.subscribe().forward(tx).await.ok();
}
async fn import_bytes(
    data: Bytes,
    scope: Scope,
    format: BlobFormat,
    tx: mpsc::Sender<AddProgressItem>,
) -> anyhow::Result<ImportEntry> {
    tx.send(AddProgressItem::Size(data.len() as u64)).await?;
    tx.send(AddProgressItem::CopyDone).await?;
    let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
    Ok(ImportEntry {
        data,
        outboard,
        scope,
        format,
        tx,
    })
}
async fn import_byte_stream(
    scope: Scope,
    format: BlobFormat,
    mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
    tx: mpsc::Sender<AddProgressItem>,
) -> anyhow::Result<ImportEntry> {
    let mut res = Vec::new();
    loop {
        match rx.recv().await {
            Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
                res.extend_from_slice(&data);
                tx.send(AddProgressItem::CopyProgress(res.len() as u64))
                    .await?;
            }
            Ok(Some(ImportByteStreamUpdate::Done)) => {
                break;
            }
            Ok(None) => {
                return Err(api::Error::io(
                    io::ErrorKind::UnexpectedEof,
                    "byte stream ended unexpectedly",
                )
                .into());
            }
            Err(e) => {
                return Err(e.into());
            }
        }
    }
    import_bytes(res.into(), scope, format, tx).await
}
#[instrument(skip_all, fields(path = %cmd.path.display()))]
async fn import_path(cmd: ImportPathMsg) -> anyhow::Result<ImportEntry> {
    let ImportPathMsg {
        inner:
            ImportPathRequest {
                path,
                scope,
                format,
                ..
            },
        tx,
        ..
    } = cmd;
    let mut res = Vec::new();
    let mut file = tokio::fs::File::open(path).await?;
    let mut buf = [0u8; 1024 * 64];
    loop {
        let size = file.read(&mut buf).await?;
        if size == 0 {
            break;
        }
        res.extend_from_slice(&buf[..size]);
        tx.send(AddProgressItem::CopyProgress(res.len() as u64))
            .await?;
    }
    import_bytes(res.into(), scope, format, tx).await
}
#[instrument(skip_all, fields(hash = %cmd.hash.fmt_short(), path = %cmd.target.display()))]
async fn export_path(entry: Option<BaoFileHandle>, cmd: ExportPathMsg) {
    let ExportPathMsg { inner, mut tx, .. } = cmd;
    let Some(entry) = entry else {
        tx.send(ExportProgressItem::Error(api::Error::io(
            io::ErrorKind::NotFound,
            "hash not found",
        )))
        .await
        .ok();
        return;
    };
    match export_path_impl(entry, inner, &mut tx).await {
        Ok(()) => tx.send(ExportProgressItem::Done).await.ok(),
        Err(e) => tx.send(ExportProgressItem::Error(e.into())).await.ok(),
    };
}
async fn export_path_impl(
    entry: BaoFileHandle,
    cmd: ExportPathRequest,
    tx: &mut mpsc::Sender<ExportProgressItem>,
) -> io::Result<()> {
    let ExportPathRequest { target, .. } = cmd;
    if !target.is_absolute() {
        return Err(io::Error::new(
            io::ErrorKind::InvalidInput,
            "path is not absolute",
        ));
    }
    if let Some(parent) = target.parent() {
        std::fs::create_dir_all(parent)?;
    }
    let mut file = std::fs::File::create(target)?;
    let size = entry.0.state.borrow().size();
    tx.send(ExportProgressItem::Size(size)).await?;
    let mut buf = [0u8; 1024 * 64];
    for offset in (0..size).step_by(1024 * 64) {
        let len = std::cmp::min(size - offset, 1024 * 64) as usize;
        let buf = &mut buf[..len];
        entry.0.state.borrow().data().read_exact_at(offset, buf)?;
        file.write_all(buf)?;
        tx.try_send(ExportProgressItem::CopyProgress(offset))
            .await
            .map_err(|_e| io::Error::other(""))?;
        yield_now().await;
    }
    Ok(())
}
struct ImportEntry {
    scope: Scope,
    format: BlobFormat,
    data: Bytes,
    outboard: PreOrderMemOutboard,
    tx: mpsc::Sender<AddProgressItem>,
}
pub struct DataReader(BaoFileHandle);
impl ReadBytesAt for DataReader {
    fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result<Bytes> {
        let entry = self.0 .0.state.borrow();
        entry.data().read_bytes_at(offset, size)
    }
}
pub struct OutboardReader {
    hash: blake3::Hash,
    tree: BaoTree,
    data: BaoFileHandle,
}
impl Outboard for OutboardReader {
    fn root(&self) -> blake3::Hash {
        self.hash
    }
    fn tree(&self) -> BaoTree {
        self.tree
    }
    fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
        let Some(offset) = self.tree.pre_order_offset(node) else {
            return Ok(None);
        };
        let mut buf = [0u8; 64];
        let size = self
            .data
            .0
            .state
            .borrow()
            .outboard()
            .read_at(offset * 64, &mut buf)?;
        if size != 64 {
            return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "short read"));
        }
        let left: [u8; 32] = buf[..32].try_into().unwrap();
        let right: [u8; 32] = buf[32..].try_into().unwrap();
        Ok(Some((left.into(), right.into())))
    }
}
struct State {
    data: HashMap<Hash, BaoFileHandle>,
    tags: BTreeMap<Tag, HashAndFormat>,
    empty_hash: BaoFileHandle,
}
#[derive(Debug, derive_more::From)]
pub enum BaoFileStorage {
    Partial(PartialMemStorage),
    Complete(CompleteStorage),
}
impl BaoFileStorage {
    pub fn bitfield(&self) -> Bitfield {
        match self {
            Self::Partial(entry) => entry.bitfield.clone(),
            Self::Complete(entry) => Bitfield::complete(entry.size()),
        }
    }
}
#[derive(Debug)]
pub struct BaoFileHandleInner {
    state: watch::Sender<BaoFileStorage>,
    hash: Hash,
}
#[derive(Debug, Clone, derive_more::Deref)]
pub struct BaoFileHandle(Arc<BaoFileHandleInner>);
impl BaoFileHandle {
    pub fn new_partial(hash: Hash) -> Self {
        let (state, _) = watch::channel(BaoFileStorage::Partial(PartialMemStorage {
            data: SparseMemFile::new(),
            outboard: SparseMemFile::new(),
            size: SizeInfo::default(),
            bitfield: Bitfield::empty(),
        }));
        Self(Arc::new(BaoFileHandleInner { state, hash }))
    }
    pub fn hash(&self) -> Hash {
        self.hash
    }
    pub fn bitfield(&self) -> Bitfield {
        self.0.state.borrow().bitfield()
    }
    pub fn subscribe(&self) -> BaoFileStorageSubscriber {
        BaoFileStorageSubscriber::new(self.0.state.subscribe())
    }
    pub fn data_reader(&self) -> DataReader {
        DataReader(self.clone())
    }
    pub fn outboard_reader(&self) -> OutboardReader {
        let entry = self.0.state.borrow();
        let hash = self.hash.into();
        let tree = BaoTree::new(entry.size(), IROH_BLOCK_SIZE);
        OutboardReader {
            hash,
            tree,
            data: self.clone(),
        }
    }
}
impl Default for BaoFileStorage {
    fn default() -> Self {
        Self::Partial(Default::default())
    }
}
impl BaoFileStorage {
    fn data(&self) -> &[u8] {
        match self {
            Self::Partial(entry) => entry.data.as_ref(),
            Self::Complete(entry) => &entry.data,
        }
    }
    fn outboard(&self) -> &[u8] {
        match self {
            Self::Partial(entry) => entry.outboard.as_ref(),
            Self::Complete(entry) => &entry.outboard,
        }
    }
    fn size(&self) -> u64 {
        match self {
            Self::Partial(entry) => entry.current_size(),
            Self::Complete(entry) => entry.size(),
        }
    }
}
#[derive(Debug, Clone)]
pub struct CompleteStorage {
    pub(crate) data: Bytes,
    pub(crate) outboard: Bytes,
}
impl CompleteStorage {
    pub fn create(data: Bytes) -> (Hash, Self) {
        let outboard = PreOrderMemOutboard::create(&data, IROH_BLOCK_SIZE);
        let hash = outboard.root().into();
        let outboard = outboard.data.into();
        let entry = Self::new(data, outboard);
        (hash, entry)
    }
    pub fn new(data: Bytes, outboard: Bytes) -> Self {
        Self { data, outboard }
    }
    pub fn size(&self) -> u64 {
        self.data.len() as u64
    }
}
#[allow(dead_code)]
fn print_outboard(hashes: &[u8]) {
    assert!(hashes.len() % 64 == 0);
    for chunk in hashes.chunks(64) {
        let left: [u8; 32] = chunk[..32].try_into().unwrap();
        let right: [u8; 32] = chunk[32..].try_into().unwrap();
        let left = blake3::Hash::from(left);
        let right = blake3::Hash::from(right);
        println!("l: {left:?}, r: {right:?}");
    }
}
pub struct BaoFileStorageSubscriber {
    receiver: watch::Receiver<BaoFileStorage>,
}
impl BaoFileStorageSubscriber {
    pub fn new(receiver: watch::Receiver<BaoFileStorage>) -> Self {
        Self { receiver }
    }
    pub async fn forward(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
        let value = self.receiver.borrow().bitfield();
        tx.send(value).await?;
        loop {
            self.update_or_closed(&mut tx).await?;
            let value = self.receiver.borrow().bitfield();
            tx.send(value.clone()).await?;
        }
    }
    #[allow(dead_code)]
    pub async fn forward_delta(mut self, mut tx: mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
        let value = self.receiver.borrow().bitfield();
        let mut old = value.clone();
        tx.send(value).await?;
        loop {
            self.update_or_closed(&mut tx).await?;
            let new = self.receiver.borrow().bitfield();
            let diff = old.diff(&new);
            if diff.is_empty() {
                continue;
            }
            tx.send(diff).await?;
            old = new;
        }
    }
    async fn update_or_closed(&mut self, tx: &mut mpsc::Sender<Bitfield>) -> anyhow::Result<()> {
        tokio::select! {
            _ = tx.closed() => {
                Err(irpc::channel::SendError::ReceiverClosed.into())
            }
            e = self.receiver.changed() => Ok(e?),
        }
    }
}
#[cfg(test)]
mod tests {
    use n0_future::StreamExt;
    use testresult::TestResult;
    use super::*;
    #[tokio::test]
    async fn smoke() -> TestResult<()> {
        let store = MemStore::new();
        let tt = store.add_bytes(vec![0u8; 1024 * 64]).temp_tag().await?;
        let hash = *tt.hash();
        println!("hash: {hash:?}");
        let mut stream = store.export_bao(hash, ChunkRanges::all()).stream();
        while let Some(item) = stream.next().await {
            println!("item: {item:?}");
        }
        let stream = store.export_bao(hash, ChunkRanges::all());
        let exported = stream.bao_to_vec().await?;
        let store2 = MemStore::new();
        let mut or = store2.observe(hash).stream().await?;
        tokio::spawn(async move {
            while let Some(event) = or.next().await {
                println!("event: {event:?}");
            }
        });
        store2
            .import_bao_bytes(hash, ChunkRanges::all(), exported.clone())
            .await?;
        let exported2 = store2
            .export_bao(hash, ChunkRanges::all())
            .bao_to_vec()
            .await?;
        assert_eq!(exported, exported2);
        Ok(())
    }
}