use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
io,
path::PathBuf,
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
time::SystemTime,
};
use bao_tree::{
io::{fsm::Outboard, outboard::PreOrderOutboard, sync::WriteAt},
BaoTree,
};
use bytes::{Bytes, BytesMut};
use futures_lite::{Stream, StreamExt};
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_io::AsyncSliceReader;
use super::{
temp_name, BaoBatchWriter, ConsistencyCheckProgress, ExportMode, ExportProgressCb, ImportMode,
ImportProgress, Map, TempCounterMap,
};
use crate::{
store::{
mutable_mem_storage::MutableMemStorage, BaoBlobSize, MapEntry, MapEntryMut, ReadableStore,
},
util::{
progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender},
TagCounter, TagDrop,
},
Tag, TempTag, IROH_BLOCK_SIZE,
};
#[derive(Debug, Clone, Default)]
pub struct Store {
inner: Arc<StoreInner>,
}
#[derive(Debug, Default)]
struct StoreInner(RwLock<StateInner>);
impl TagDrop for StoreInner {
fn on_drop(&self, inner: &HashAndFormat) {
tracing::trace!("temp tag drop: {:?}", inner);
let mut state = self.0.write().unwrap();
state.temp.dec(inner);
}
}
impl TagCounter for StoreInner {
fn on_create(&self, inner: &HashAndFormat) {
tracing::trace!("temp tagging: {:?}", inner);
let mut state = self.0.write().unwrap();
state.temp.inc(inner);
}
}
impl Store {
pub fn new() -> Self {
Self::default()
}
fn write_lock(&self) -> RwLockWriteGuard<'_, StateInner> {
self.inner.0.write().unwrap()
}
fn read_lock(&self) -> RwLockReadGuard<'_, StateInner> {
self.inner.0.read().unwrap()
}
fn import_bytes_sync(
&self,
id: u64,
bytes: Bytes,
format: BlobFormat,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> io::Result<TempTag> {
progress.blocking_send(ImportProgress::OutboardProgress { id, offset: 0 })?;
let progress2 = progress.clone();
let cb = move |offset| {
progress2
.try_send(ImportProgress::OutboardProgress { id, offset })
.ok();
};
let (storage, hash) = MutableMemStorage::complete(bytes, cb);
progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
use super::Store;
let tag = self.temp_tag(HashAndFormat { hash, format });
let entry = Entry {
inner: Arc::new(EntryInner {
hash,
data: RwLock::new(storage),
}),
complete: true,
};
self.write_lock().entries.insert(hash, entry);
Ok(tag)
}
fn export_sync(
&self,
hash: Hash,
target: PathBuf,
_mode: ExportMode,
progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
) -> io::Result<()> {
tracing::trace!("exporting {} to {}", hash, target.display());
if !target.is_absolute() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"target path must be absolute",
));
}
let parent = target.parent().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"target path has no parent directory",
)
})?;
std::fs::create_dir_all(parent)?;
let state = self.read_lock();
let entry = state
.entries
.get(&hash)
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "hash not found"))?;
let reader = &entry.inner.data;
let size = reader.read().unwrap().current_size();
let mut file = std::fs::File::create(target)?;
for offset in (0..size).step_by(1024 * 1024) {
let bytes = reader.read().unwrap().read_data_at(offset, 1024 * 1024);
file.write_at(offset, &bytes)?;
progress(offset)?;
}
std::io::Write::flush(&mut file)?;
drop(file);
Ok(())
}
}
impl super::Store for Store {
async fn import_file(
&self,
path: std::path::PathBuf,
_mode: ImportMode,
format: BlobFormat,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> io::Result<(TempTag, u64)> {
let this = self.clone();
tokio::task::spawn_blocking(move || {
let id = progress.new_id();
progress.blocking_send(ImportProgress::Found {
id,
name: path.to_string_lossy().to_string(),
})?;
progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
let bytes: Bytes = std::fs::read(path)?.into();
let size = bytes.len() as u64;
progress.blocking_send(ImportProgress::Size { id, size })?;
let tag = this.import_bytes_sync(id, bytes, format, progress)?;
Ok((tag, size))
})
.await?
}
async fn import_stream(
&self,
mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
format: BlobFormat,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> io::Result<(TempTag, u64)> {
let this = self.clone();
let id = progress.new_id();
let name = temp_name();
progress.send(ImportProgress::Found { id, name }).await?;
let mut bytes = BytesMut::new();
while let Some(chunk) = data.next().await {
bytes.extend_from_slice(&chunk?);
progress
.try_send(ImportProgress::CopyProgress {
id,
offset: bytes.len() as u64,
})
.ok();
}
let bytes = bytes.freeze();
let size = bytes.len() as u64;
progress.blocking_send(ImportProgress::Size { id, size })?;
let tag = this.import_bytes_sync(id, bytes, format, progress)?;
Ok((tag, size))
}
async fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> io::Result<TempTag> {
let this = self.clone();
tokio::task::spawn_blocking(move || {
this.import_bytes_sync(0, bytes, format, IgnoreProgressSender::default())
})
.await?
}
async fn set_tag(&self, name: Tag, value: Option<HashAndFormat>) -> io::Result<()> {
let mut state = self.write_lock();
if let Some(value) = value {
state.tags.insert(name, value);
} else {
state.tags.remove(&name);
}
Ok(())
}
async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
let mut state = self.write_lock();
let tag = Tag::auto(SystemTime::now(), |x| state.tags.contains_key(x));
state.tags.insert(tag.clone(), hash);
Ok(tag)
}
fn temp_tag(&self, tag: HashAndFormat) -> TempTag {
self.inner.temp_tag(tag)
}
async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
where
G: Fn() -> Gut,
Gut: Future<Output = BTreeSet<Hash>> + Send,
{
super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await
}
async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
let mut state = self.write_lock();
for hash in hashes {
if !state.temp.contains(&hash) {
state.entries.remove(&hash);
}
}
Ok(())
}
async fn shutdown(&self) {}
async fn sync(&self) -> io::Result<()> {
Ok(())
}
}
#[derive(Debug, Default)]
struct StateInner {
entries: BTreeMap<Hash, Entry>,
tags: BTreeMap<Tag, HashAndFormat>,
temp: TempCounterMap,
}
#[derive(Debug, Clone)]
pub struct Entry {
inner: Arc<EntryInner>,
complete: bool,
}
#[derive(Debug)]
struct EntryInner {
hash: Hash,
data: RwLock<MutableMemStorage>,
}
impl MapEntry for Entry {
fn hash(&self) -> Hash {
self.inner.hash
}
fn size(&self) -> BaoBlobSize {
let size = self.inner.data.read().unwrap().current_size();
BaoBlobSize::new(size, self.complete)
}
fn is_complete(&self) -> bool {
self.complete
}
async fn outboard(&self) -> io::Result<impl Outboard> {
let size = self.inner.data.read().unwrap().current_size();
Ok(PreOrderOutboard {
root: self.hash().into(),
tree: BaoTree::new(size, IROH_BLOCK_SIZE),
data: OutboardReader(self.inner.clone()),
})
}
async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
Ok(DataReader(self.inner.clone()))
}
}
impl MapEntryMut for Entry {
async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
Ok(BatchWriter(self.inner.clone()))
}
}
struct DataReader(Arc<EntryInner>);
impl AsyncSliceReader for DataReader {
async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Bytes> {
Ok(self.0.data.read().unwrap().read_data_at(offset, len))
}
async fn size(&mut self) -> std::io::Result<u64> {
Ok(self.0.data.read().unwrap().data_len())
}
}
struct OutboardReader(Arc<EntryInner>);
impl AsyncSliceReader for OutboardReader {
async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Bytes> {
Ok(self.0.data.read().unwrap().read_outboard_at(offset, len))
}
async fn size(&mut self) -> std::io::Result<u64> {
Ok(self.0.data.read().unwrap().outboard_len())
}
}
struct BatchWriter(Arc<EntryInner>);
impl super::BaoBatchWriter for BatchWriter {
async fn write_batch(
&mut self,
size: u64,
batch: Vec<bao_tree::io::fsm::BaoContentItem>,
) -> io::Result<()> {
self.0.data.write().unwrap().write_batch(size, &batch)
}
async fn sync(&mut self) -> io::Result<()> {
Ok(())
}
}
impl super::Map for Store {
type Entry = Entry;
async fn get(&self, hash: &Hash) -> std::io::Result<Option<Self::Entry>> {
Ok(self.inner.0.read().unwrap().entries.get(hash).cloned())
}
}
impl super::MapMut for Store {
type EntryMut = Entry;
async fn get_mut(&self, hash: &Hash) -> std::io::Result<Option<Self::EntryMut>> {
self.get(hash).await
}
async fn get_or_create(&self, hash: Hash, _size: u64) -> std::io::Result<Entry> {
let entry = Entry {
inner: Arc::new(EntryInner {
hash,
data: RwLock::new(MutableMemStorage::default()),
}),
complete: false,
};
Ok(entry)
}
async fn entry_status(&self, hash: &Hash) -> std::io::Result<crate::store::EntryStatus> {
self.entry_status_sync(hash)
}
fn entry_status_sync(&self, hash: &Hash) -> std::io::Result<crate::store::EntryStatus> {
Ok(match self.inner.0.read().unwrap().entries.get(hash) {
Some(entry) => {
if entry.complete {
crate::store::EntryStatus::Complete
} else {
crate::store::EntryStatus::Partial
}
}
None => crate::store::EntryStatus::NotFound,
})
}
async fn insert_complete(&self, mut entry: Entry) -> std::io::Result<()> {
let hash = entry.hash();
let mut inner = self.inner.0.write().unwrap();
let complete = inner
.entries
.get(&hash)
.map(|x| x.complete)
.unwrap_or_default();
if !complete {
entry.complete = true;
inner.entries.insert(hash, entry);
}
Ok(())
}
}
impl ReadableStore for Store {
async fn blobs(&self) -> io::Result<crate::store::DbIter<Hash>> {
let entries = self.read_lock().entries.clone();
Ok(Box::new(
entries
.into_values()
.filter(|x| x.complete)
.map(|x| Ok(x.hash())),
))
}
async fn partial_blobs(&self) -> io::Result<crate::store::DbIter<Hash>> {
let entries = self.read_lock().entries.clone();
Ok(Box::new(
entries
.into_values()
.filter(|x| !x.complete)
.map(|x| Ok(x.hash())),
))
}
async fn tags(
&self,
) -> io::Result<crate::store::DbIter<(crate::Tag, iroh_base::hash::HashAndFormat)>> {
#[allow(clippy::mutable_key_type)]
let tags = self.read_lock().tags.clone();
Ok(Box::new(tags.into_iter().map(Ok)))
}
fn temp_tags(
&self,
) -> Box<dyn Iterator<Item = iroh_base::hash::HashAndFormat> + Send + Sync + 'static> {
let tags = self.read_lock().temp.keys();
Box::new(tags)
}
async fn consistency_check(
&self,
_repair: bool,
_tx: BoxedProgressSender<ConsistencyCheckProgress>,
) -> io::Result<()> {
todo!()
}
async fn export(
&self,
hash: Hash,
target: std::path::PathBuf,
mode: crate::store::ExportMode,
progress: ExportProgressCb,
) -> io::Result<()> {
let this = self.clone();
tokio::task::spawn_blocking(move || this.export_sync(hash, target, mode, progress)).await?
}
}