use std::{
collections::{BTreeMap, BTreeSet, HashMap},
future::Future,
io,
path::PathBuf,
sync::Arc,
};
use bao_tree::{
blake3,
io::{outboard::PreOrderMemOutboard, sync::Outboard},
};
use bytes::Bytes;
use futures_lite::Stream;
use iroh_io::AsyncSliceReader;
use tokio::io::AsyncWriteExt;
use super::{BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, DbIter, ExportProgressCb};
use crate::{
store::{
EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry, MapEntryMut,
ReadableStore,
},
util::{
progress::{BoxedProgressSender, IdGenerator, ProgressSender},
Tag,
},
BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
};
#[derive(Debug, Clone, Default)]
pub struct Store(Arc<HashMap<Hash, (PreOrderMemOutboard<Bytes>, Bytes)>>);
impl<K, V> FromIterator<(K, V)> for Store
where
K: Into<String>,
V: AsRef<[u8]>,
{
fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
let (db, _m) = Self::new(iter);
db
}
}
impl Store {
pub fn new(
entries: impl IntoIterator<Item = (impl Into<String>, impl AsRef<[u8]>)>,
) -> (Self, BTreeMap<String, blake3::Hash>) {
let mut names = BTreeMap::new();
let mut res = HashMap::new();
for (name, data) in entries.into_iter() {
let name = name.into();
let data: &[u8] = data.as_ref();
let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE).map_data(Bytes::from);
let hash = outboard.root();
names.insert(name, hash);
let data = Bytes::from(data.to_vec());
let hash = Hash::from(hash);
res.insert(hash, (outboard, data));
}
(Self(Arc::new(res)), names)
}
pub fn insert(&mut self, data: impl AsRef<[u8]>) -> Hash {
let inner = Arc::make_mut(&mut self.0);
let data: &[u8] = data.as_ref();
let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE).map_data(Bytes::from);
let hash = outboard.root();
let data = Bytes::from(data.to_vec());
let hash = Hash::from(hash);
inner.insert(hash, (outboard, data));
hash
}
pub fn insert_many(
&mut self,
items: impl IntoIterator<Item = impl AsRef<[u8]>>,
) -> Option<Hash> {
let mut hash = None;
for item in items.into_iter() {
hash = Some(self.insert(item));
}
hash
}
pub fn get_content(&self, hash: &Hash) -> Option<Bytes> {
let entry = self.0.get(hash)?;
Some(entry.1.clone())
}
async fn export_impl(
&self,
hash: Hash,
target: PathBuf,
_mode: ExportMode,
progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
) -> io::Result<()> {
tracing::trace!("exporting {} to {}", hash, target.display());
if !target.is_absolute() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"target path must be absolute",
));
}
let parent = target.parent().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"target path has no parent directory",
)
})?;
tokio::fs::create_dir_all(parent).await?;
let data = self
.get_content(&hash)
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "hash not found"))?;
let mut offset = 0u64;
let mut file = tokio::fs::File::create(&target).await?;
for chunk in data.chunks(1024 * 1024) {
progress(offset)?;
file.write_all(chunk).await?;
offset += chunk.len() as u64;
}
file.sync_all().await?;
drop(file);
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct Entry {
outboard: PreOrderMemOutboard<Bytes>,
data: Bytes,
}
impl MapEntry for Entry {
fn hash(&self) -> Hash {
self.outboard.root().into()
}
fn size(&self) -> BaoBlobSize {
BaoBlobSize::Verified(self.data.len() as u64)
}
async fn outboard(&self) -> io::Result<impl bao_tree::io::fsm::Outboard> {
Ok(self.outboard.clone())
}
async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
Ok(self.data.clone())
}
fn is_complete(&self) -> bool {
true
}
}
impl Map for Store {
type Entry = Entry;
async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
Ok(self.0.get(hash).map(|(o, d)| Entry {
outboard: o.clone(),
data: d.clone(),
}))
}
}
impl super::MapMut for Store {
type EntryMut = Entry;
async fn get_mut(&self, hash: &Hash) -> io::Result<Option<Self::EntryMut>> {
self.get(hash).await
}
async fn get_or_create(&self, _hash: Hash, _size: u64) -> io::Result<Entry> {
Err(io::Error::new(
io::ErrorKind::Other,
"cannot create temp entry in readonly database",
))
}
fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
Ok(match self.0.contains_key(hash) {
true => EntryStatus::Complete,
false => EntryStatus::NotFound,
})
}
async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
self.entry_status_sync(hash)
}
async fn insert_complete(&self, _entry: Entry) -> io::Result<()> {
unreachable!()
}
}
impl ReadableStore for Store {
async fn blobs(&self) -> io::Result<DbIter<Hash>> {
Ok(Box::new(
self.0
.keys()
.copied()
.map(Ok)
.collect::<Vec<_>>()
.into_iter(),
))
}
async fn tags(&self) -> io::Result<DbIter<(Tag, HashAndFormat)>> {
Ok(Box::new(std::iter::empty()))
}
fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
Box::new(std::iter::empty())
}
async fn consistency_check(
&self,
_repair: bool,
_tx: BoxedProgressSender<ConsistencyCheckProgress>,
) -> io::Result<()> {
Ok(())
}
async fn export(
&self,
hash: Hash,
target: PathBuf,
mode: ExportMode,
progress: ExportProgressCb,
) -> io::Result<()> {
self.export_impl(hash, target, mode, progress).await
}
async fn partial_blobs(&self) -> io::Result<DbIter<Hash>> {
Ok(Box::new(std::iter::empty()))
}
}
impl MapEntryMut for Entry {
async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
enum Bar {}
impl BaoBatchWriter for Bar {
async fn write_batch(
&mut self,
_size: u64,
_batch: Vec<bao_tree::io::fsm::BaoContentItem>,
) -> io::Result<()> {
unreachable!()
}
async fn sync(&mut self) -> io::Result<()> {
unreachable!()
}
}
#[allow(unreachable_code)]
Ok(unreachable!() as Bar)
}
}
impl super::Store for Store {
async fn import_file(
&self,
data: PathBuf,
mode: ImportMode,
format: BlobFormat,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> io::Result<(TempTag, u64)> {
let _ = (data, mode, progress, format);
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
}
async fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> io::Result<TempTag> {
let _ = (bytes, format);
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
}
async fn import_stream(
&self,
data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send,
format: BlobFormat,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> io::Result<(TempTag, u64)> {
let _ = (data, format, progress);
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
}
async fn set_tag(&self, _name: Tag, _hash: Option<HashAndFormat>) -> io::Result<()> {
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
}
async fn create_tag(&self, _hash: HashAndFormat) -> io::Result<Tag> {
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
}
fn temp_tag(&self, inner: HashAndFormat) -> TempTag {
TempTag::new(inner, None)
}
async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
where
G: Fn() -> Gut,
Gut: Future<Output = BTreeSet<Hash>> + Send,
{
super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await
}
async fn delete(&self, _hashes: Vec<Hash>) -> io::Result<()> {
Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
}
async fn shutdown(&self) {}
async fn sync(&self) -> io::Result<()> {
Ok(())
}
}