use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
io,
path::{Path, PathBuf},
sync::{Arc, RwLock},
time::{Duration, SystemTime},
};
use bao_tree::io::{
fsm::Outboard,
sync::{ReadAt, Size},
};
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};
use genawaiter::rc::{Co, Gen};
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_io::AsyncSliceReader;
use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use tokio::io::AsyncWriteExt;
use tracing::trace_span;
mod migrate_redb_v1_v2;
mod tables;
#[doc(hidden)]
pub mod test_support;
#[cfg(test)]
mod tests;
mod util;
mod validate;
use tables::{ReadOnlyTables, ReadableTables, Tables};
use self::{tables::DeleteSet, test_support::EntryData, util::PeekableFlumeReceiver};
use super::{
bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode,
ExportProgressCb, ImportMode, ImportProgress, Map, ReadableStore, TempCounterMap,
};
use crate::{
store::{
bao_file::{BaoFileStorage, CompleteStorage},
fs::{
tables::BaoFilePart,
util::{overwrite_and_sync, read_and_remove},
},
GcMarkEvent, GcSweepEvent,
},
util::{
compute_outboard,
progress::{
BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError,
ProgressSender,
},
raw_outboard_size, MemOrFile, TagCounter, TagDrop,
},
Tag, TempTag,
};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum DataLocation<I = (), E = ()> {
Inline(I),
Owned(E),
External(Vec<PathBuf>, E),
}
impl<X> DataLocation<X, u64> {
fn union(self, that: DataLocation<X, u64>) -> ActorResult<Self> {
Ok(match (self, that) {
(
DataLocation::External(mut paths, a_size),
DataLocation::External(b_paths, b_size),
) => {
if a_size != b_size {
return Err(ActorError::Inconsistent(format!(
"complete size mismatch {} {}",
a_size, b_size
)));
}
paths.extend(b_paths);
paths.sort();
paths.dedup();
DataLocation::External(paths, a_size)
}
(_, b @ DataLocation::Owned(_)) => {
b
}
(a @ DataLocation::Owned(_), _) => {
a
}
(_, b @ DataLocation::Inline(_)) => {
b
}
(a @ DataLocation::Inline(_), _) => {
a
}
})
}
}
impl<I, E> DataLocation<I, E> {
fn discard_inline_data(self) -> DataLocation<(), E> {
match self {
DataLocation::Inline(_) => DataLocation::Inline(()),
DataLocation::Owned(x) => DataLocation::Owned(x),
DataLocation::External(paths, x) => DataLocation::External(paths, x),
}
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum OutboardLocation<I = ()> {
Inline(I),
Owned,
NotNeeded,
}
impl<I> OutboardLocation<I> {
fn discard_extra_data(self) -> OutboardLocation<()> {
match self {
Self::Inline(_) => OutboardLocation::Inline(()),
Self::Owned => OutboardLocation::Owned,
Self::NotNeeded => OutboardLocation::NotNeeded,
}
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum EntryState<I = ()> {
Complete {
data_location: DataLocation<I, u64>,
outboard_location: OutboardLocation<I>,
},
Partial {
size: Option<u64>,
},
}
impl Default for EntryState {
fn default() -> Self {
Self::Partial { size: None }
}
}
impl EntryState {
fn union(self, that: Self) -> ActorResult<Self> {
match (self, that) {
(
Self::Complete {
data_location,
outboard_location,
},
Self::Complete {
data_location: b_data_location,
..
},
) => Ok(Self::Complete {
data_location: data_location.union(b_data_location)?,
outboard_location,
}),
(a @ Self::Complete { .. }, Self::Partial { .. }) =>
{
Ok(a)
}
(Self::Partial { .. }, b @ Self::Complete { .. }) =>
{
Ok(b)
}
(Self::Partial { size: a_size }, Self::Partial { size: b_size }) =>
{
let size = match (a_size, b_size) {
(Some(a_size), Some(b_size)) => {
if a_size != b_size {
return Err(ActorError::Inconsistent(format!(
"validated size mismatch {} {}",
a_size, b_size
)));
}
Some(a_size)
}
(Some(a_size), None) => Some(a_size),
(None, Some(b_size)) => Some(b_size),
(None, None) => None,
};
Ok(Self::Partial { size })
}
}
}
}
impl redb::Value for EntryState {
type SelfType<'a> = EntryState;
type AsBytes<'a> = SmallVec<[u8; 128]>;
fn fixed_width() -> Option<usize> {
None
}
fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
where
Self: 'a,
{
postcard::from_bytes(data).unwrap()
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
where
Self: 'a,
Self: 'b,
{
postcard::to_extend(value, SmallVec::new()).unwrap()
}
fn type_name() -> redb::TypeName {
redb::TypeName::new("EntryState")
}
}
#[derive(Debug, Clone)]
pub struct InlineOptions {
pub max_data_inlined: u64,
pub max_outboard_inlined: u64,
}
impl InlineOptions {
pub const NO_INLINE: Self = Self {
max_data_inlined: 0,
max_outboard_inlined: 0,
};
pub const ALWAYS_INLINE: Self = Self {
max_data_inlined: u64::MAX,
max_outboard_inlined: u64::MAX,
};
}
impl Default for InlineOptions {
fn default() -> Self {
Self {
max_data_inlined: 1024 * 16,
max_outboard_inlined: 1024 * 16,
}
}
}
#[derive(Debug, Clone)]
pub struct PathOptions {
pub data_path: PathBuf,
pub temp_path: PathBuf,
}
impl PathOptions {
fn new(root: &Path) -> Self {
Self {
data_path: root.join("data"),
temp_path: root.join("temp"),
}
}
fn owned_data_path(&self, hash: &Hash) -> PathBuf {
self.data_path.join(format!("{}.data", hash.to_hex()))
}
fn owned_outboard_path(&self, hash: &Hash) -> PathBuf {
self.data_path.join(format!("{}.obao4", hash.to_hex()))
}
fn owned_sizes_path(&self, hash: &Hash) -> PathBuf {
self.data_path.join(format!("{}.sizes4", hash.to_hex()))
}
fn temp_file_name(&self) -> PathBuf {
self.temp_path.join(temp_name())
}
}
#[derive(Debug, Clone)]
pub struct BatchOptions {
pub max_read_batch: usize,
pub max_read_duration: Duration,
pub max_write_batch: usize,
pub max_write_duration: Duration,
}
impl Default for BatchOptions {
fn default() -> Self {
Self {
max_read_batch: 10000,
max_read_duration: Duration::from_secs(1),
max_write_batch: 1000,
max_write_duration: Duration::from_millis(500),
}
}
}
#[derive(Debug, Clone)]
pub struct Options {
pub path: PathOptions,
pub inline: InlineOptions,
pub batch: BatchOptions,
}
#[derive(derive_more::Debug)]
pub(crate) enum ImportSource {
TempFile(PathBuf),
External(PathBuf),
Memory(#[debug(skip)] Bytes),
}
impl ImportSource {
fn content(&self) -> MemOrFile<&[u8], &Path> {
match self {
Self::TempFile(path) => MemOrFile::File(path.as_path()),
Self::External(path) => MemOrFile::File(path.as_path()),
Self::Memory(data) => MemOrFile::Mem(data.as_ref()),
}
}
fn len(&self) -> io::Result<u64> {
match self {
Self::TempFile(path) => std::fs::metadata(path).map(|m| m.len()),
Self::External(path) => std::fs::metadata(path).map(|m| m.len()),
Self::Memory(data) => Ok(data.len() as u64),
}
}
}
pub type Entry = BaoFileHandle;
impl super::MapEntry for Entry {
fn hash(&self) -> Hash {
self.hash()
}
fn size(&self) -> BaoBlobSize {
let size = self.current_size().unwrap();
tracing::trace!("redb::Entry::size() = {}", size);
BaoBlobSize::new(size, self.is_complete())
}
fn is_complete(&self) -> bool {
self.is_complete()
}
async fn outboard(&self) -> io::Result<impl Outboard> {
self.outboard()
}
async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
Ok(self.data_reader())
}
}
impl super::MapEntryMut for Entry {
async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
Ok(self.writer())
}
}
#[derive(derive_more::Debug)]
pub(crate) struct Import {
content_id: HashAndFormat,
source: ImportSource,
data_size: u64,
#[debug("{:?}", outboard.as_ref().map(|x| x.len()))]
outboard: Option<Vec<u8>>,
}
#[derive(derive_more::Debug)]
pub(crate) struct Export {
temp_tag: TempTag,
target: PathBuf,
mode: ExportMode,
#[debug(skip)]
progress: ExportProgressCb,
}
#[derive(derive_more::Debug)]
pub(crate) enum ActorMessage {
Get {
hash: Hash,
tx: oneshot::Sender<ActorResult<Option<BaoFileHandle>>>,
},
EntryStatus {
hash: Hash,
tx: oneshot::Sender<ActorResult<EntryStatus>>,
},
#[cfg(test)]
EntryState {
hash: Hash,
tx: oneshot::Sender<ActorResult<test_support::EntryStateResponse>>,
},
GetFullEntryState {
hash: Hash,
tx: oneshot::Sender<ActorResult<Option<EntryData>>>,
},
SetFullEntryState {
hash: Hash,
entry: Option<EntryData>,
tx: oneshot::Sender<ActorResult<()>>,
},
GetOrCreate {
hash: Hash,
tx: oneshot::Sender<ActorResult<BaoFileHandle>>,
},
OnMemSizeExceeded { hash: Hash },
OnComplete { handle: BaoFileHandle },
Import {
cmd: Import,
tx: oneshot::Sender<ActorResult<(TempTag, u64)>>,
},
Export {
cmd: Export,
tx: oneshot::Sender<ActorResult<()>>,
},
UpdateInlineOptions {
inline_options: InlineOptions,
reapply: bool,
tx: oneshot::Sender<()>,
},
Blobs {
#[debug(skip)]
filter: FilterPredicate<Hash, EntryState>,
#[allow(clippy::type_complexity)]
tx: oneshot::Sender<
ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>>,
>,
},
Tags {
#[debug(skip)]
filter: FilterPredicate<Tag, HashAndFormat>,
#[allow(clippy::type_complexity)]
tx: oneshot::Sender<
ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>>,
>,
},
SetTag {
tag: Tag,
value: Option<HashAndFormat>,
tx: oneshot::Sender<ActorResult<()>>,
},
CreateTag {
hash: HashAndFormat,
tx: oneshot::Sender<ActorResult<Tag>>,
},
Delete {
hashes: Vec<Hash>,
tx: oneshot::Sender<ActorResult<()>>,
},
GcDelete {
hashes: Vec<Hash>,
tx: oneshot::Sender<ActorResult<()>>,
},
Sync { tx: oneshot::Sender<()> },
Dump,
Fsck {
repair: bool,
progress: BoxedProgressSender<ConsistencyCheckProgress>,
tx: oneshot::Sender<ActorResult<()>>,
},
GcStart { tx: oneshot::Sender<()> },
Shutdown { tx: Option<oneshot::Sender<()>> },
}
impl ActorMessage {
fn category(&self) -> MessageCategory {
match self {
Self::Get { .. }
| Self::GetOrCreate { .. }
| Self::EntryStatus { .. }
| Self::Blobs { .. }
| Self::Tags { .. }
| Self::GcStart { .. }
| Self::GetFullEntryState { .. }
| Self::Dump => MessageCategory::ReadOnly,
Self::Import { .. }
| Self::Export { .. }
| Self::OnMemSizeExceeded { .. }
| Self::OnComplete { .. }
| Self::SetTag { .. }
| Self::CreateTag { .. }
| Self::SetFullEntryState { .. }
| Self::Delete { .. }
| Self::GcDelete { .. } => MessageCategory::ReadWrite,
Self::UpdateInlineOptions { .. }
| Self::Sync { .. }
| Self::Shutdown { .. }
| Self::Fsck { .. } => MessageCategory::TopLevel,
#[cfg(test)]
Self::EntryState { .. } => MessageCategory::ReadOnly,
}
}
}
enum MessageCategory {
ReadOnly,
ReadWrite,
TopLevel,
}
pub(crate) type FilterPredicate<K, V> =
Box<dyn Fn(u64, AccessGuard<K>, AccessGuard<V>) -> Option<(K, V)> + Send + Sync>;
#[derive(Debug, Clone)]
pub struct Store(Arc<StoreInner>);
impl Store {
pub async fn load(root: impl AsRef<Path>) -> io::Result<Self> {
let path = root.as_ref();
let db_path = path.join("blobs.db");
let options = Options {
path: PathOptions::new(path),
inline: Default::default(),
batch: Default::default(),
};
Self::new(db_path, options).await
}
pub async fn new(path: PathBuf, options: Options) -> io::Result<Self> {
let rt = tokio::runtime::Handle::try_current()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "no tokio runtime"))?;
let inner =
tokio::task::spawn_blocking(move || StoreInner::new_sync(path, options, rt)).await??;
Ok(Self(Arc::new(inner)))
}
pub async fn update_inline_options(
&self,
inline_options: InlineOptions,
reapply: bool,
) -> io::Result<()> {
Ok(self
.0
.update_inline_options(inline_options, reapply)
.await?)
}
pub async fn dump(&self) -> io::Result<()> {
Ok(self.0.dump().await?)
}
}
#[derive(Debug)]
struct StoreInner {
tx: async_channel::Sender<ActorMessage>,
temp: Arc<RwLock<TempCounterMap>>,
handle: Option<std::thread::JoinHandle<()>>,
path_options: Arc<PathOptions>,
}
impl TagDrop for RwLock<TempCounterMap> {
fn on_drop(&self, content: &HashAndFormat) {
self.write().unwrap().dec(content);
}
}
impl TagCounter for RwLock<TempCounterMap> {
fn on_create(&self, content: &HashAndFormat) {
self.write().unwrap().inc(content);
}
}
impl StoreInner {
fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result<Self> {
tracing::trace!(
"creating data directory: {}",
options.path.data_path.display()
);
std::fs::create_dir_all(&options.path.data_path)?;
tracing::trace!(
"creating temp directory: {}",
options.path.temp_path.display()
);
std::fs::create_dir_all(&options.path.temp_path)?;
tracing::trace!(
"creating parent directory for db file{}",
path.parent().unwrap().display()
);
std::fs::create_dir_all(path.parent().unwrap())?;
let temp: Arc<RwLock<TempCounterMap>> = Default::default();
let (actor, tx) = Actor::new(&path, options.clone(), temp.clone(), rt.clone())?;
let handle = std::thread::Builder::new()
.name("redb-actor".to_string())
.spawn(move || {
rt.block_on(async move {
if let Err(cause) = actor.run_batched().await {
tracing::error!("redb actor failed: {}", cause);
}
});
})
.expect("failed to spawn thread");
Ok(Self {
tx,
temp,
handle: Some(handle),
path_options: Arc::new(options.path),
})
}
pub async fn get(&self, hash: Hash) -> OuterResult<Option<BaoFileHandle>> {
let (tx, rx) = oneshot::channel();
self.tx.send(ActorMessage::Get { hash, tx }).await?;
Ok(rx.await??)
}
async fn get_or_create(&self, hash: Hash) -> OuterResult<BaoFileHandle> {
let (tx, rx) = oneshot::channel();
self.tx.send(ActorMessage::GetOrCreate { hash, tx }).await?;
Ok(rx.await??)
}
async fn blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
let (tx, rx) = oneshot::channel();
let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
let v = v.value();
if let EntryState::Complete { .. } = &v {
Some((k.value(), v))
} else {
None
}
});
self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
let blobs = rx.await?;
let res = blobs?
.into_iter()
.map(|r| {
r.map(|(hash, _)| hash)
.map_err(|e| ActorError::from(e).into())
})
.collect::<Vec<_>>();
Ok(res)
}
async fn partial_blobs(&self) -> OuterResult<Vec<io::Result<Hash>>> {
let (tx, rx) = oneshot::channel();
let filter: FilterPredicate<Hash, EntryState> = Box::new(|_i, k, v| {
let v = v.value();
if let EntryState::Partial { .. } = &v {
Some((k.value(), v))
} else {
None
}
});
self.tx.send(ActorMessage::Blobs { filter, tx }).await?;
let blobs = rx.await?;
let res = blobs?
.into_iter()
.map(|r| {
r.map(|(hash, _)| hash)
.map_err(|e| ActorError::from(e).into())
})
.collect::<Vec<_>>();
Ok(res)
}
async fn tags(&self) -> OuterResult<Vec<io::Result<(Tag, HashAndFormat)>>> {
let (tx, rx) = oneshot::channel();
let filter: FilterPredicate<Tag, HashAndFormat> =
Box::new(|_i, k, v| Some((k.value(), v.value())));
self.tx.send(ActorMessage::Tags { filter, tx }).await?;
let tags = rx.await?;
let tags = tags?
.into_iter()
.map(|r| r.map_err(|e| ActorError::from(e).into()))
.collect();
Ok(tags)
}
async fn set_tag(&self, tag: Tag, value: Option<HashAndFormat>) -> OuterResult<()> {
let (tx, rx) = oneshot::channel();
self.tx
.send(ActorMessage::SetTag { tag, value, tx })
.await?;
Ok(rx.await??)
}
async fn create_tag(&self, hash: HashAndFormat) -> OuterResult<Tag> {
let (tx, rx) = oneshot::channel();
self.tx.send(ActorMessage::CreateTag { hash, tx }).await?;
Ok(rx.await??)
}
async fn delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
let (tx, rx) = oneshot::channel();
self.tx.send(ActorMessage::Delete { hashes, tx }).await?;
Ok(rx.await??)
}
async fn gc_delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
let (tx, rx) = oneshot::channel();
self.tx.send(ActorMessage::GcDelete { hashes, tx }).await?;
Ok(rx.await??)
}
async fn gc_start(&self) -> OuterResult<()> {
let (tx, rx) = oneshot::channel();
self.tx.send(ActorMessage::GcStart { tx }).await?;
Ok(rx.await?)
}
async fn entry_status(&self, hash: &Hash) -> OuterResult<EntryStatus> {
let (tx, rx) = oneshot::channel();
self.tx
.send(ActorMessage::EntryStatus { hash: *hash, tx })
.await?;
Ok(rx.await??)
}
fn entry_status_sync(&self, hash: &Hash) -> OuterResult<EntryStatus> {
let (tx, rx) = oneshot::channel();
self.tx
.send_blocking(ActorMessage::EntryStatus { hash: *hash, tx })?;
Ok(rx.recv()??)
}
async fn complete(&self, entry: Entry) -> OuterResult<()> {
self.tx
.send(ActorMessage::OnComplete { handle: entry })
.await?;
Ok(())
}
async fn export(
&self,
hash: Hash,
target: PathBuf,
mode: ExportMode,
progress: ExportProgressCb,
) -> OuterResult<()> {
tracing::debug!(
"exporting {} to {} using mode {:?}",
hash.to_hex(),
target.display(),
mode
);
if !target.is_absolute() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"target path must be absolute",
)
.into());
}
let parent = target.parent().ok_or_else(|| {
OuterError::from(io::Error::new(
io::ErrorKind::InvalidInput,
"target path has no parent directory",
))
})?;
std::fs::create_dir_all(parent)?;
let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash));
let (tx, rx) = oneshot::channel();
self.tx
.send(ActorMessage::Export {
cmd: Export {
temp_tag,
target,
mode,
progress,
},
tx,
})
.await?;
Ok(rx.await??)
}
async fn consistency_check(
&self,
repair: bool,
progress: BoxedProgressSender<ConsistencyCheckProgress>,
) -> OuterResult<()> {
let (tx, rx) = oneshot::channel();
self.tx
.send(ActorMessage::Fsck {
repair,
progress,
tx,
})
.await?;
Ok(rx.await??)
}
async fn update_inline_options(
&self,
inline_options: InlineOptions,
reapply: bool,
) -> OuterResult<()> {
let (tx, rx) = oneshot::channel();
self.tx
.send(ActorMessage::UpdateInlineOptions {
inline_options,
reapply,
tx,
})
.await?;
Ok(rx.await?)
}
async fn dump(&self) -> OuterResult<()> {
self.tx.send(ActorMessage::Dump).await?;
Ok(())
}
async fn sync(&self) -> OuterResult<()> {
let (tx, rx) = oneshot::channel();
self.tx.send(ActorMessage::Sync { tx }).await?;
Ok(rx.await?)
}
fn import_file_sync(
&self,
path: PathBuf,
mode: ImportMode,
format: BlobFormat,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> OuterResult<(TempTag, u64)> {
if !path.is_absolute() {
return Err(
io::Error::new(io::ErrorKind::InvalidInput, "path must be absolute").into(),
);
}
if !path.is_file() && !path.is_symlink() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"path is not a file or symlink",
)
.into());
}
let id = progress.new_id();
progress.blocking_send(ImportProgress::Found {
id,
name: path.to_string_lossy().to_string(),
})?;
let file = match mode {
ImportMode::TryReference => ImportSource::External(path),
ImportMode::Copy => {
if std::fs::metadata(&path)?.len() < 16 * 1024 {
let data = std::fs::read(&path)?;
ImportSource::Memory(data.into())
} else {
let temp_path = self.temp_file_name();
progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
if reflink_copy::reflink_or_copy(&path, &temp_path)?.is_none() {
tracing::debug!("reflinked {} to {}", path.display(), temp_path.display());
} else {
tracing::debug!("copied {} to {}", path.display(), temp_path.display());
}
ImportSource::TempFile(temp_path)
}
}
};
let (tag, size) = self.finalize_import_sync(file, format, id, progress)?;
Ok((tag, size))
}
fn import_bytes_sync(&self, data: Bytes, format: BlobFormat) -> OuterResult<TempTag> {
let id = 0;
let file = ImportSource::Memory(data);
let progress = IgnoreProgressSender::default();
let (tag, _size) = self.finalize_import_sync(file, format, id, progress)?;
Ok(tag)
}
fn finalize_import_sync(
&self,
file: ImportSource,
format: BlobFormat,
id: u64,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> OuterResult<(TempTag, u64)> {
let data_size = file.len()?;
tracing::debug!("finalize_import_sync {:?} {}", file, data_size);
progress.blocking_send(ImportProgress::Size {
id,
size: data_size,
})?;
let progress2 = progress.clone();
let (hash, outboard) = match file.content() {
MemOrFile::File(path) => {
let span = trace_span!("outboard.compute", path = %path.display());
let _guard = span.enter();
let file = std::fs::File::open(path)?;
compute_outboard(file, data_size, move |offset| {
Ok(progress2.try_send(ImportProgress::OutboardProgress { id, offset })?)
})?
}
MemOrFile::Mem(bytes) => {
compute_outboard(bytes, data_size, |_| Ok(()))?
}
};
progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
let tag = self.temp.temp_tag(HashAndFormat { hash, format });
let hash = *tag.hash();
let (tx, rx) = oneshot::channel();
self.tx.send_blocking(ActorMessage::Import {
cmd: Import {
content_id: HashAndFormat { hash, format },
source: file,
outboard,
data_size,
},
tx,
})?;
Ok(rx.recv()??)
}
fn temp_file_name(&self) -> PathBuf {
self.path_options.temp_file_name()
}
async fn shutdown(&self) {
let (tx, rx) = oneshot::channel();
self.tx
.send(ActorMessage::Shutdown { tx: Some(tx) })
.await
.ok();
rx.await.ok();
}
}
impl Drop for StoreInner {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
self.tx
.send_blocking(ActorMessage::Shutdown { tx: None })
.ok();
handle.join().ok();
}
}
}
struct ActorState {
handles: BTreeMap<Hash, BaoFileHandleWeak>,
protected: BTreeSet<Hash>,
temp: Arc<RwLock<TempCounterMap>>,
msgs_rx: async_channel::Receiver<ActorMessage>,
create_options: Arc<BaoFileConfig>,
options: Options,
rt: tokio::runtime::Handle,
}
struct Actor {
db: redb::Database,
state: ActorState,
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum ActorError {
#[error("table error: {0}")]
Table(#[from] redb::TableError),
#[error("database error: {0}")]
Database(#[from] redb::DatabaseError),
#[error("transaction error: {0}")]
Transaction(#[from] redb::TransactionError),
#[error("commit error: {0}")]
Commit(#[from] redb::CommitError),
#[error("storage error: {0}")]
Storage(#[from] redb::StorageError),
#[error("io error: {0}")]
Io(#[from] io::Error),
#[error("inconsistent database state: {0}")]
Inconsistent(String),
#[error("error during database migration: {0}")]
Migration(#[source] anyhow::Error),
}
impl From<ActorError> for io::Error {
fn from(e: ActorError) -> Self {
match e {
ActorError::Io(e) => e,
e => io::Error::new(io::ErrorKind::Other, e),
}
}
}
pub(crate) type ActorResult<T> = std::result::Result<T, ActorError>;
#[derive(Debug, thiserror::Error)]
pub(crate) enum OuterError {
#[error("inner error: {0}")]
Inner(#[from] ActorError),
#[error("send error")]
Send,
#[error("progress send error: {0}")]
ProgressSend(#[from] ProgressSendError),
#[error("recv error: {0}")]
Recv(#[from] oneshot::RecvError),
#[error("recv error: {0}")]
AsyncChannelRecv(#[from] async_channel::RecvError),
#[error("join error: {0}")]
JoinTask(#[from] tokio::task::JoinError),
}
impl From<async_channel::SendError<ActorMessage>> for OuterError {
fn from(_e: async_channel::SendError<ActorMessage>) -> Self {
OuterError::Send
}
}
pub(crate) type OuterResult<T> = std::result::Result<T, OuterError>;
impl From<io::Error> for OuterError {
fn from(e: io::Error) -> Self {
OuterError::Inner(ActorError::Io(e))
}
}
impl From<OuterError> for io::Error {
fn from(e: OuterError) -> Self {
match e {
OuterError::Inner(ActorError::Io(e)) => e,
e => io::Error::new(io::ErrorKind::Other, e),
}
}
}
impl super::Map for Store {
type Entry = Entry;
async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
Ok(self.0.get(*hash).await?.map(From::from))
}
}
impl super::MapMut for Store {
type EntryMut = Entry;
async fn get_or_create(&self, hash: Hash, _size: u64) -> io::Result<Self::EntryMut> {
Ok(self.0.get_or_create(hash).await?)
}
async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
Ok(self.0.entry_status(hash).await?)
}
async fn get_mut(&self, hash: &Hash) -> io::Result<Option<Self::EntryMut>> {
self.get(hash).await
}
async fn insert_complete(&self, entry: Self::EntryMut) -> io::Result<()> {
Ok(self.0.complete(entry).await?)
}
fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
Ok(self.0.entry_status_sync(hash)?)
}
}
impl super::ReadableStore for Store {
async fn blobs(&self) -> io::Result<super::DbIter<Hash>> {
Ok(Box::new(self.0.blobs().await?.into_iter()))
}
async fn partial_blobs(&self) -> io::Result<super::DbIter<Hash>> {
Ok(Box::new(self.0.partial_blobs().await?.into_iter()))
}
async fn tags(&self) -> io::Result<super::DbIter<(Tag, HashAndFormat)>> {
Ok(Box::new(self.0.tags().await?.into_iter()))
}
fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
Box::new(self.0.temp.read().unwrap().keys())
}
async fn consistency_check(
&self,
repair: bool,
tx: BoxedProgressSender<ConsistencyCheckProgress>,
) -> io::Result<()> {
self.0.consistency_check(repair, tx.clone()).await?;
Ok(())
}
async fn export(
&self,
hash: Hash,
target: PathBuf,
mode: ExportMode,
progress: ExportProgressCb,
) -> io::Result<()> {
Ok(self.0.export(hash, target, mode, progress).await?)
}
}
impl super::Store for Store {
async fn import_file(
&self,
path: PathBuf,
mode: ImportMode,
format: BlobFormat,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> io::Result<(crate::TempTag, u64)> {
let this = self.0.clone();
Ok(
tokio::task::spawn_blocking(move || {
this.import_file_sync(path, mode, format, progress)
})
.await??,
)
}
async fn import_bytes(
&self,
data: bytes::Bytes,
format: iroh_base::hash::BlobFormat,
) -> io::Result<crate::TempTag> {
let this = self.0.clone();
Ok(tokio::task::spawn_blocking(move || this.import_bytes_sync(data, format)).await??)
}
async fn import_stream(
&self,
mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
format: BlobFormat,
progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
) -> io::Result<(TempTag, u64)> {
let this = self.clone();
let id = progress.new_id();
let temp_data_path = this.0.temp_file_name();
let name = temp_data_path
.file_name()
.expect("just created")
.to_string_lossy()
.to_string();
progress.send(ImportProgress::Found { id, name }).await?;
let mut writer = tokio::fs::File::create(&temp_data_path).await?;
let mut offset = 0;
while let Some(chunk) = data.next().await {
let chunk = chunk?;
writer.write_all(&chunk).await?;
offset += chunk.len() as u64;
progress.try_send(ImportProgress::CopyProgress { id, offset })?;
}
writer.flush().await?;
drop(writer);
let file = ImportSource::TempFile(temp_data_path);
Ok(tokio::task::spawn_blocking(move || {
this.0.finalize_import_sync(file, format, id, progress)
})
.await??)
}
async fn set_tag(&self, name: Tag, hash: Option<HashAndFormat>) -> io::Result<()> {
Ok(self.0.set_tag(name, hash).await?)
}
async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
Ok(self.0.create_tag(hash).await?)
}
async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
Ok(self.0.delete(hashes).await?)
}
async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
where
G: Fn() -> Gut,
Gut: Future<Output = BTreeSet<Hash>> + Send,
{
tracing::info!("Starting GC task with interval {:?}", config.period);
let mut live = BTreeSet::new();
'outer: loop {
if let Err(cause) = self.0.gc_start().await {
tracing::debug!(
"unable to notify the db of GC start: {cause}. Shutting down GC loop."
);
break;
}
tokio::time::sleep(config.period).await;
tracing::debug!("Starting GC");
live.clear();
let p = protected_cb().await;
live.extend(p);
tracing::debug!("Starting GC mark phase");
let live_ref = &mut live;
let mut stream = Gen::new(|co| async move {
if let Err(e) = super::gc_mark_task(self, live_ref, &co).await {
co.yield_(GcMarkEvent::Error(e)).await;
}
});
while let Some(item) = stream.next().await {
match item {
GcMarkEvent::CustomDebug(text) => {
tracing::debug!("{}", text);
}
GcMarkEvent::CustomWarning(text, _) => {
tracing::warn!("{}", text);
}
GcMarkEvent::Error(err) => {
tracing::error!("Fatal error during GC mark {}", err);
continue 'outer;
}
}
}
drop(stream);
tracing::debug!("Starting GC sweep phase");
let live_ref = &live;
let mut stream = Gen::new(|co| async move {
if let Err(e) = gc_sweep_task(self, live_ref, &co).await {
co.yield_(GcSweepEvent::Error(e)).await;
}
});
while let Some(item) = stream.next().await {
match item {
GcSweepEvent::CustomDebug(text) => {
tracing::debug!("{}", text);
}
GcSweepEvent::CustomWarning(text, _) => {
tracing::warn!("{}", text);
}
GcSweepEvent::Error(err) => {
tracing::error!("Fatal error during GC mark {}", err);
continue 'outer;
}
}
}
if let Some(ref cb) = config.done_callback {
cb();
}
}
}
fn temp_tag(&self, value: HashAndFormat) -> TempTag {
self.0.temp.temp_tag(value)
}
async fn sync(&self) -> io::Result<()> {
Ok(self.0.sync().await?)
}
async fn shutdown(&self) {
self.0.shutdown().await;
}
}
pub(super) async fn gc_sweep_task<'a>(
store: &'a Store,
live: &BTreeSet<Hash>,
co: &Co<GcSweepEvent>,
) -> anyhow::Result<()> {
let blobs = store.blobs().await?.chain(store.partial_blobs().await?);
let mut count = 0;
let mut batch = Vec::new();
for hash in blobs {
let hash = hash?;
if !live.contains(&hash) {
batch.push(hash);
count += 1;
}
if batch.len() >= 100 {
store.0.gc_delete(batch.clone()).await?;
batch.clear();
}
}
if !batch.is_empty() {
store.0.gc_delete(batch).await?;
}
co.yield_(GcSweepEvent::CustomDebug(format!(
"deleted {} blobs",
count
)))
.await;
Ok(())
}
impl Actor {
fn new(
path: &Path,
options: Options,
temp: Arc<RwLock<TempCounterMap>>,
rt: tokio::runtime::Handle,
) -> ActorResult<(Self, async_channel::Sender<ActorMessage>)> {
let db = match redb::Database::create(path) {
Ok(db) => db,
Err(DatabaseError::UpgradeRequired(1)) => {
migrate_redb_v1_v2::run(path).map_err(ActorError::Migration)?
}
Err(err) => return Err(err.into()),
};
let txn = db.begin_write()?;
let mut t = Default::default();
let tables = Tables::new(&txn, &mut t)?;
drop(tables);
txn.commit()?;
let (tx, rx) = async_channel::bounded(1024);
let tx2 = tx.clone();
let on_file_create: CreateCb = Arc::new(move |hash| {
tx2.send_blocking(ActorMessage::OnMemSizeExceeded { hash: *hash })
.ok();
Ok(())
});
let create_options = BaoFileConfig::new(
Arc::new(options.path.data_path.clone()),
16 * 1024,
Some(on_file_create),
);
Ok((
Self {
db,
state: ActorState {
temp,
handles: BTreeMap::new(),
protected: BTreeSet::new(),
msgs_rx: rx,
options,
create_options: Arc::new(create_options),
rt,
},
},
tx,
))
}
async fn run_batched(mut self) -> ActorResult<()> {
let mut msgs = PeekableFlumeReceiver::new(self.state.msgs_rx.clone());
while let Some(msg) = msgs.recv().await {
if let ActorMessage::Shutdown { tx } = msg {
drop(self);
if let Some(tx) = tx {
tx.send(()).ok();
}
break;
}
match msg.category() {
MessageCategory::TopLevel => {
self.state.handle_toplevel(&self.db, msg)?;
}
MessageCategory::ReadOnly => {
msgs.push_back(msg).expect("just recv'd");
tracing::debug!("starting read transaction");
let txn = self.db.begin_read()?;
let tables = ReadOnlyTables::new(&txn)?;
let count = self.state.options.batch.max_read_batch;
let timeout = tokio::time::sleep(self.state.options.batch.max_read_duration);
tokio::pin!(timeout);
for _ in 0..count {
tokio::select! {
msg = msgs.recv() => {
if let Some(msg) = msg {
if let Err(msg) = self.state.handle_readonly(&tables, msg)? {
msgs.push_back(msg).expect("just recv'd");
break;
}
} else {
break;
}
}
_ = &mut timeout => {
tracing::debug!("read transaction timed out");
break;
}
}
}
tracing::debug!("done with read transaction");
}
MessageCategory::ReadWrite => {
msgs.push_back(msg).expect("just recv'd");
tracing::debug!("starting write transaction");
let txn = self.db.begin_write()?;
let mut delete_after_commit = Default::default();
let mut tables = Tables::new(&txn, &mut delete_after_commit)?;
let count = self.state.options.batch.max_write_batch;
let timeout = tokio::time::sleep(self.state.options.batch.max_write_duration);
tokio::pin!(timeout);
for _ in 0..count {
tokio::select! {
msg = msgs.recv() => {
if let Some(msg) = msg {
if let Err(msg) = self.state.handle_readwrite(&mut tables, msg)? {
msgs.push_back(msg).expect("just recv'd");
break;
}
} else {
break;
}
}
_ = &mut timeout => {
tracing::debug!("write transaction timed out");
break;
}
}
}
drop(tables);
txn.commit()?;
delete_after_commit.apply_and_clear(&self.state.options.path);
tracing::debug!("write transaction committed");
}
}
}
tracing::debug!("redb actor done");
Ok(())
}
}
impl ActorState {
fn entry_status(
&mut self,
tables: &impl ReadableTables,
hash: Hash,
) -> ActorResult<EntryStatus> {
let status = match tables.blobs().get(hash)? {
Some(guard) => match guard.value() {
EntryState::Complete { .. } => EntryStatus::Complete,
EntryState::Partial { .. } => EntryStatus::Partial,
},
None => EntryStatus::NotFound,
};
Ok(status)
}
fn get(
&mut self,
tables: &impl ReadableTables,
hash: Hash,
) -> ActorResult<Option<BaoFileHandle>> {
if let Some(handle) = self.handles.get(&hash).and_then(|weak| weak.upgrade()) {
return Ok(Some(handle));
}
let Some(entry) = tables.blobs().get(hash)? else {
return Ok(None);
};
let entry = entry.value();
let config = self.create_options.clone();
let handle = match entry {
EntryState::Complete {
data_location,
outboard_location,
} => {
let data = load_data(tables, &self.options.path, data_location, &hash)?;
let outboard = load_outboard(
tables,
&self.options.path,
outboard_location,
data.size(),
&hash,
)?;
BaoFileHandle::new_complete(config, hash, data, outboard)
}
EntryState::Partial { .. } => BaoFileHandle::incomplete_file(config, hash)?,
};
self.handles.insert(hash, handle.downgrade());
Ok(Some(handle))
}
fn export(
&mut self,
tables: &mut Tables,
cmd: Export,
tx: oneshot::Sender<ActorResult<()>>,
) -> ActorResult<()> {
let Export {
temp_tag,
target,
mode,
progress,
} = cmd;
let guard = tables
.blobs
.get(temp_tag.hash())?
.ok_or_else(|| ActorError::Inconsistent("entry not found".to_owned()))?;
let entry = guard.value();
match entry {
EntryState::Complete {
data_location,
outboard_location,
} => match data_location {
DataLocation::Inline(()) => {
let data = tables.inline_data.get(temp_tag.hash())?.ok_or_else(|| {
ActorError::Inconsistent("inline data not found".to_owned())
})?;
tracing::trace!("exporting inline data to {}", target.display());
tx.send(std::fs::write(&target, data.value()).map_err(|e| e.into()))
.ok();
}
DataLocation::Owned(size) => {
let path = self.options.path.owned_data_path(temp_tag.hash());
match mode {
ExportMode::Copy => {
self.rt.spawn_blocking(move || {
tx.send(export_file_copy(temp_tag, path, size, target, progress))
.ok();
});
}
ExportMode::TryReference => match std::fs::rename(&path, &target) {
Ok(()) => {
let entry = EntryState::Complete {
data_location: DataLocation::External(vec![target], size),
outboard_location,
};
drop(guard);
tables.blobs.insert(temp_tag.hash(), entry)?;
drop(temp_tag);
tx.send(Ok(())).ok();
}
Err(e) => {
const ERR_CROSS: i32 = 18;
if e.raw_os_error() == Some(ERR_CROSS) {
match std::fs::copy(&path, &target) {
Ok(_) => {
let entry = EntryState::Complete {
data_location: DataLocation::External(
vec![target],
size,
),
outboard_location,
};
drop(guard);
tables.blobs.insert(temp_tag.hash(), entry)?;
tables
.delete_after_commit
.insert(*temp_tag.hash(), [BaoFilePart::Data]);
drop(temp_tag);
tx.send(Ok(())).ok();
}
Err(e) => {
drop(temp_tag);
tx.send(Err(e.into())).ok();
}
}
} else {
drop(temp_tag);
tx.send(Err(e.into())).ok();
}
}
},
}
}
DataLocation::External(paths, size) => {
let path = paths
.first()
.ok_or_else(|| {
ActorError::Inconsistent("external path missing".to_owned())
})?
.to_owned();
if path == target {
tx.send(Ok(())).ok();
} else {
self.rt.spawn_blocking(move || {
tx.send(export_file_copy(temp_tag, path, size, target, progress))
.ok();
});
}
}
},
EntryState::Partial { .. } => {
return Err(io::Error::new(io::ErrorKind::Unsupported, "partial entry").into());
}
}
Ok(())
}
fn import(&mut self, tables: &mut Tables, cmd: Import) -> ActorResult<(TempTag, u64)> {
let Import {
content_id,
source: file,
outboard,
data_size,
} = cmd;
let outboard_size = outboard.as_ref().map(|x| x.len() as u64).unwrap_or(0);
let inline_data = data_size <= self.options.inline.max_data_inlined;
let inline_outboard =
outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0;
let tag = self.temp.temp_tag(content_id);
let hash = *tag.hash();
self.protected.insert(hash);
let data_location = match file {
ImportSource::External(external_path) => {
tracing::debug!("stored external reference {}", external_path.display());
if inline_data {
tracing::debug!(
"reading external data to inline it: {}",
external_path.display()
);
let data = Bytes::from(std::fs::read(&external_path)?);
DataLocation::Inline(data)
} else {
DataLocation::External(vec![external_path], data_size)
}
}
ImportSource::TempFile(temp_data_path) => {
if inline_data {
tracing::debug!(
"reading and deleting temp file to inline it: {}",
temp_data_path.display()
);
let data = Bytes::from(read_and_remove(&temp_data_path)?);
DataLocation::Inline(data)
} else {
let data_path = self.options.path.owned_data_path(&hash);
std::fs::rename(&temp_data_path, &data_path)?;
tracing::debug!("created file {}", data_path.display());
DataLocation::Owned(data_size)
}
}
ImportSource::Memory(data) => {
if inline_data {
DataLocation::Inline(data)
} else {
let data_path = self.options.path.owned_data_path(&hash);
overwrite_and_sync(&data_path, &data)?;
tracing::debug!("created file {}", data_path.display());
DataLocation::Owned(data_size)
}
}
};
let outboard_location = if let Some(outboard) = outboard {
if inline_outboard {
OutboardLocation::Inline(Bytes::from(outboard))
} else {
let outboard_path = self.options.path.owned_outboard_path(&hash);
overwrite_and_sync(&outboard_path, &outboard)?;
OutboardLocation::Owned
}
} else {
OutboardLocation::NotNeeded
};
if let DataLocation::Inline(data) = &data_location {
tables.inline_data.insert(hash, data.as_ref())?;
}
if let OutboardLocation::Inline(outboard) = &outboard_location {
tables.inline_outboard.insert(hash, outboard.as_ref())?;
}
if let DataLocation::Owned(_) = &data_location {
tables.delete_after_commit.remove(hash, [BaoFilePart::Data]);
}
if let OutboardLocation::Owned = &outboard_location {
tables
.delete_after_commit
.remove(hash, [BaoFilePart::Outboard]);
}
let entry = tables.blobs.get(hash)?;
let entry = entry.map(|x| x.value()).unwrap_or_default();
let data_location = data_location.discard_inline_data();
let outboard_location = outboard_location.discard_extra_data();
let entry = entry.union(EntryState::Complete {
data_location,
outboard_location,
})?;
tables.blobs.insert(hash, entry)?;
Ok((tag, data_size))
}
fn get_or_create(
&mut self,
tables: &impl ReadableTables,
hash: Hash,
) -> ActorResult<BaoFileHandle> {
self.protected.insert(hash);
if let Some(handle) = self.handles.get(&hash).and_then(|x| x.upgrade()) {
return Ok(handle);
}
let entry = tables.blobs().get(hash)?;
let handle = if let Some(entry) = entry {
let entry = entry.value();
match entry {
EntryState::Complete {
data_location,
outboard_location,
..
} => {
let data = load_data(tables, &self.options.path, data_location, &hash)?;
let outboard = load_outboard(
tables,
&self.options.path,
outboard_location,
data.size(),
&hash,
)?;
tracing::debug!("creating complete entry for {}", hash.to_hex());
BaoFileHandle::new_complete(self.create_options.clone(), hash, data, outboard)
}
EntryState::Partial { .. } => {
tracing::debug!("creating partial entry for {}", hash.to_hex());
BaoFileHandle::incomplete_file(self.create_options.clone(), hash)?
}
}
} else {
BaoFileHandle::incomplete_mem(self.create_options.clone(), hash)
};
self.handles.insert(hash, handle.downgrade());
Ok(handle)
}
fn blobs(
&mut self,
tables: &impl ReadableTables,
filter: FilterPredicate<Hash, EntryState>,
) -> ActorResult<Vec<std::result::Result<(Hash, EntryState), StorageError>>> {
let mut res = Vec::new();
let mut index = 0u64;
#[allow(clippy::explicit_counter_loop)]
for item in tables.blobs().iter()? {
match item {
Ok((k, v)) => {
if let Some(item) = filter(index, k, v) {
res.push(Ok(item));
}
}
Err(e) => {
res.push(Err(e));
}
}
index += 1;
}
Ok(res)
}
fn tags(
&mut self,
tables: &impl ReadableTables,
filter: FilterPredicate<Tag, HashAndFormat>,
) -> ActorResult<Vec<std::result::Result<(Tag, HashAndFormat), StorageError>>> {
let mut res = Vec::new();
let mut index = 0u64;
#[allow(clippy::explicit_counter_loop)]
for item in tables.tags().iter()? {
match item {
Ok((k, v)) => {
if let Some(item) = filter(index, k, v) {
res.push(Ok(item));
}
}
Err(e) => {
res.push(Err(e));
}
}
index += 1;
}
Ok(res)
}
fn create_tag(&mut self, tables: &mut Tables, content: HashAndFormat) -> ActorResult<Tag> {
let tag = {
let tag = Tag::auto(SystemTime::now(), |x| {
matches!(tables.tags.get(Tag(Bytes::copy_from_slice(x))), Ok(Some(_)))
});
tables.tags.insert(tag.clone(), content)?;
tag
};
Ok(tag)
}
fn set_tag(
&self,
tables: &mut Tables,
tag: Tag,
value: Option<HashAndFormat>,
) -> ActorResult<()> {
match value {
Some(value) => {
tables.tags.insert(tag, value)?;
}
None => {
tables.tags.remove(tag)?;
}
}
Ok(())
}
fn on_mem_size_exceeded(&mut self, tables: &mut Tables, hash: Hash) -> ActorResult<()> {
let entry = tables
.blobs
.get(hash)?
.map(|x| x.value())
.unwrap_or_default();
let entry = entry.union(EntryState::Partial { size: None })?;
tables.blobs.insert(hash, entry)?;
tables.delete_after_commit.remove(
hash,
[BaoFilePart::Data, BaoFilePart::Outboard, BaoFilePart::Sizes],
);
Ok(())
}
fn update_inline_options(
&mut self,
db: &redb::Database,
options: InlineOptions,
reapply: bool,
) -> ActorResult<()> {
self.options.inline = options;
if reapply {
let mut delete_after_commit = Default::default();
let tx = db.begin_write()?;
{
let mut tables = Tables::new(&tx, &mut delete_after_commit)?;
let hashes = tables
.blobs
.iter()?
.map(|x| x.map(|(k, _)| k.value()))
.collect::<Result<Vec<_>, _>>()?;
for hash in hashes {
let guard = tables
.blobs
.get(hash)?
.ok_or_else(|| ActorError::Inconsistent("hash not found".to_owned()))?;
let entry = guard.value();
if let EntryState::Complete {
data_location,
outboard_location,
} = entry
{
let (data_location, data_size, data_location_changed) = match data_location
{
DataLocation::Owned(size) => {
if size <= self.options.inline.max_data_inlined {
let path = self.options.path.owned_data_path(&hash);
let data = std::fs::read(&path)?;
tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
tables.inline_data.insert(hash, data.as_slice())?;
(DataLocation::Inline(()), size, true)
} else {
(DataLocation::Owned(size), size, false)
}
}
DataLocation::Inline(()) => {
let guard = tables.inline_data.get(hash)?.ok_or_else(|| {
ActorError::Inconsistent("inline data missing".to_owned())
})?;
let data = guard.value();
let size = data.len() as u64;
if size > self.options.inline.max_data_inlined {
let path = self.options.path.owned_data_path(&hash);
std::fs::write(&path, data)?;
drop(guard);
tables.inline_data.remove(hash)?;
(DataLocation::Owned(size), size, true)
} else {
(DataLocation::Inline(()), size, false)
}
}
DataLocation::External(paths, size) => {
(DataLocation::External(paths, size), size, false)
}
};
let outboard_size = raw_outboard_size(data_size);
let (outboard_location, outboard_location_changed) = match outboard_location
{
OutboardLocation::Owned
if outboard_size <= self.options.inline.max_outboard_inlined =>
{
let path = self.options.path.owned_outboard_path(&hash);
let outboard = std::fs::read(&path)?;
tables
.delete_after_commit
.insert(hash, [BaoFilePart::Outboard]);
tables.inline_outboard.insert(hash, outboard.as_slice())?;
(OutboardLocation::Inline(()), true)
}
OutboardLocation::Inline(())
if outboard_size > self.options.inline.max_outboard_inlined =>
{
let guard = tables.inline_outboard.get(hash)?.ok_or_else(|| {
ActorError::Inconsistent("inline outboard missing".to_owned())
})?;
let outboard = guard.value();
let path = self.options.path.owned_outboard_path(&hash);
std::fs::write(&path, outboard)?;
drop(guard);
tables.inline_outboard.remove(hash)?;
(OutboardLocation::Owned, true)
}
x => (x, false),
};
drop(guard);
if data_location_changed || outboard_location_changed {
tables.blobs.insert(
hash,
EntryState::Complete {
data_location,
outboard_location,
},
)?;
}
}
}
}
tx.commit()?;
delete_after_commit.apply_and_clear(&self.options.path);
}
Ok(())
}
fn delete(&mut self, tables: &mut Tables, hashes: Vec<Hash>, force: bool) -> ActorResult<()> {
for hash in hashes {
if self.temp.as_ref().read().unwrap().contains(&hash) {
continue;
}
if !force && self.protected.contains(&hash) {
tracing::debug!("protected hash, continuing {}", &hash.to_hex()[..8]);
continue;
}
tracing::debug!("deleting {}", &hash.to_hex()[..8]);
self.handles.remove(&hash);
if let Some(entry) = tables.blobs.remove(hash)? {
match entry.value() {
EntryState::Complete {
data_location,
outboard_location,
} => {
match data_location {
DataLocation::Inline(_) => {
tables.inline_data.remove(hash)?;
}
DataLocation::Owned(_) => {
tables.delete_after_commit.insert(hash, [BaoFilePart::Data]);
}
DataLocation::External(_, _) => {}
}
match outboard_location {
OutboardLocation::Inline(_) => {
tables.inline_outboard.remove(hash)?;
}
OutboardLocation::Owned => {
tables
.delete_after_commit
.insert(hash, [BaoFilePart::Outboard]);
}
OutboardLocation::NotNeeded => {}
}
}
EntryState::Partial { .. } => {
tables.delete_after_commit.insert(
hash,
[BaoFilePart::Outboard, BaoFilePart::Data, BaoFilePart::Sizes],
);
}
}
}
}
Ok(())
}
fn on_complete(&mut self, tables: &mut Tables, entry: BaoFileHandle) -> ActorResult<()> {
let hash = entry.hash();
let mut info = None;
tracing::trace!("on_complete({})", hash.to_hex());
entry.transform(|state| {
tracing::trace!("on_complete transform {:?}", state);
let entry = match complete_storage(
state,
&hash,
&self.options.path,
&self.options.inline,
tables.delete_after_commit,
)? {
Ok(entry) => {
info = Some((
entry.data_size(),
entry.data.mem().cloned(),
entry.outboard_size(),
entry.outboard.mem().cloned(),
));
entry
}
Err(entry) => {
entry
}
};
Ok(BaoFileStorage::Complete(entry))
})?;
if let Some((data_size, data, outboard_size, outboard)) = info {
let data_location = if data.is_some() {
DataLocation::Inline(())
} else {
DataLocation::Owned(data_size)
};
let outboard_location = if outboard_size == 0 {
OutboardLocation::NotNeeded
} else if outboard.is_some() {
OutboardLocation::Inline(())
} else {
OutboardLocation::Owned
};
{
tracing::debug!(
"inserting complete entry for {}, {} bytes",
hash.to_hex(),
data_size,
);
let entry = tables
.blobs()
.get(hash)?
.map(|x| x.value())
.unwrap_or_default();
let entry = entry.union(EntryState::Complete {
data_location,
outboard_location,
})?;
tables.blobs.insert(hash, entry)?;
if let Some(data) = data {
tables.inline_data.insert(hash, data.as_ref())?;
}
if let Some(outboard) = outboard {
tables.inline_outboard.insert(hash, outboard.as_ref())?;
}
}
}
Ok(())
}
fn handle_toplevel(&mut self, db: &redb::Database, msg: ActorMessage) -> ActorResult<()> {
match msg {
ActorMessage::UpdateInlineOptions {
inline_options,
reapply,
tx,
} => {
let res = self.update_inline_options(db, inline_options, reapply);
tx.send(res?).ok();
}
ActorMessage::Fsck {
repair,
progress,
tx,
} => {
let res = self.consistency_check(db, repair, progress);
tx.send(res).ok();
}
ActorMessage::Sync { tx } => {
tx.send(()).ok();
}
x => {
return Err(ActorError::Inconsistent(format!(
"unexpected message for handle_toplevel: {:?}",
x
)))
}
}
Ok(())
}
fn handle_readonly(
&mut self,
tables: &impl ReadableTables,
msg: ActorMessage,
) -> ActorResult<std::result::Result<(), ActorMessage>> {
match msg {
ActorMessage::Get { hash, tx } => {
let res = self.get(tables, hash);
tx.send(res).ok();
}
ActorMessage::GetOrCreate { hash, tx } => {
let res = self.get_or_create(tables, hash);
tx.send(res).ok();
}
ActorMessage::EntryStatus { hash, tx } => {
let res = self.entry_status(tables, hash);
tx.send(res).ok();
}
ActorMessage::Blobs { filter, tx } => {
let res = self.blobs(tables, filter);
tx.send(res).ok();
}
ActorMessage::Tags { filter, tx } => {
let res = self.tags(tables, filter);
tx.send(res).ok();
}
ActorMessage::GcStart { tx } => {
self.protected.clear();
self.handles.retain(|_, weak| weak.is_live());
tx.send(()).ok();
}
ActorMessage::Dump => {
dump(tables).ok();
}
#[cfg(test)]
ActorMessage::EntryState { hash, tx } => {
tx.send(self.entry_state(tables, hash)).ok();
}
ActorMessage::GetFullEntryState { hash, tx } => {
let res = self.get_full_entry_state(tables, hash);
tx.send(res).ok();
}
x => return Ok(Err(x)),
}
Ok(Ok(()))
}
fn handle_readwrite(
&mut self,
tables: &mut Tables,
msg: ActorMessage,
) -> ActorResult<std::result::Result<(), ActorMessage>> {
match msg {
ActorMessage::Import { cmd, tx } => {
let res = self.import(tables, cmd);
tx.send(res).ok();
}
ActorMessage::SetTag { tag, value, tx } => {
let res = self.set_tag(tables, tag, value);
tx.send(res).ok();
}
ActorMessage::CreateTag { hash, tx } => {
let res = self.create_tag(tables, hash);
tx.send(res).ok();
}
ActorMessage::Delete { hashes, tx } => {
let res = self.delete(tables, hashes, true);
tx.send(res).ok();
}
ActorMessage::GcDelete { hashes, tx } => {
let res = self.delete(tables, hashes, false);
tx.send(res).ok();
}
ActorMessage::OnComplete { handle } => {
let res = self.on_complete(tables, handle);
res.ok();
}
ActorMessage::Export { cmd, tx } => {
self.export(tables, cmd, tx)?;
}
ActorMessage::OnMemSizeExceeded { hash } => {
let res = self.on_mem_size_exceeded(tables, hash);
res.ok();
}
ActorMessage::Dump => {
let res = dump(tables);
res.ok();
}
ActorMessage::SetFullEntryState { hash, entry, tx } => {
let res = self.set_full_entry_state(tables, hash, entry);
tx.send(res).ok();
}
msg => {
if let Err(msg) = self.handle_readonly(tables, msg)? {
return Ok(Err(msg));
}
}
}
Ok(Ok(()))
}
}
fn export_file_copy(
temp_tag: TempTag,
path: PathBuf,
size: u64,
target: PathBuf,
progress: ExportProgressCb,
) -> ActorResult<()> {
progress(0)?;
reflink_copy::reflink_or_copy(path, target)?;
progress(size)?;
drop(temp_tag);
Ok(())
}
fn dump(tables: &impl ReadableTables) -> ActorResult<()> {
for e in tables.blobs().iter()? {
let (k, v) = e?;
let k = k.value();
let v = v.value();
println!("blobs: {} -> {:?}", k.to_hex(), v);
}
for e in tables.tags().iter()? {
let (k, v) = e?;
let k = k.value();
let v = v.value();
println!("tags: {} -> {:?}", k, v);
}
for e in tables.inline_data().iter()? {
let (k, v) = e?;
let k = k.value();
let v = v.value();
println!("inline_data: {} -> {:?}", k.to_hex(), v.len());
}
for e in tables.inline_outboard().iter()? {
let (k, v) = e?;
let k = k.value();
let v = v.value();
println!("inline_outboard: {} -> {:?}", k.to_hex(), v.len());
}
Ok(())
}
fn load_data(
tables: &impl ReadableTables,
options: &PathOptions,
location: DataLocation<(), u64>,
hash: &Hash,
) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
Ok(match location {
DataLocation::Inline(()) => {
let Some(data) = tables.inline_data().get(hash)? else {
return Err(ActorError::Inconsistent(format!(
"inconsistent database state: {} should have inline data but does not",
hash.to_hex()
)));
};
MemOrFile::Mem(Bytes::copy_from_slice(data.value()))
}
DataLocation::Owned(data_size) => {
let path = options.owned_data_path(hash);
let Ok(file) = std::fs::File::open(&path) else {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("file not found: {}", path.display()),
)
.into());
};
MemOrFile::File((file, data_size))
}
DataLocation::External(paths, data_size) => {
if paths.is_empty() {
return Err(ActorError::Inconsistent(
"external data location must not be empty".into(),
));
}
let path = &paths[0];
let Ok(file) = std::fs::File::open(path) else {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("external file not found: {}", path.display()),
)
.into());
};
MemOrFile::File((file, data_size))
}
})
}
fn load_outboard(
tables: &impl ReadableTables,
options: &PathOptions,
location: OutboardLocation,
size: u64,
hash: &Hash,
) -> ActorResult<MemOrFile<Bytes, (std::fs::File, u64)>> {
Ok(match location {
OutboardLocation::NotNeeded => MemOrFile::Mem(Bytes::new()),
OutboardLocation::Inline(_) => {
let Some(outboard) = tables.inline_outboard().get(hash)? else {
return Err(ActorError::Inconsistent(format!(
"inconsistent database state: {} should have inline outboard but does not",
hash.to_hex()
)));
};
MemOrFile::Mem(Bytes::copy_from_slice(outboard.value()))
}
OutboardLocation::Owned => {
let outboard_size = raw_outboard_size(size);
let path = options.owned_outboard_path(hash);
let Ok(file) = std::fs::File::open(&path) else {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("file not found: {} size={}", path.display(), outboard_size),
)
.into());
};
MemOrFile::File((file, outboard_size))
}
})
}
fn complete_storage(
storage: BaoFileStorage,
hash: &Hash,
path_options: &PathOptions,
inline_options: &InlineOptions,
delete_after_commit: &mut DeleteSet,
) -> ActorResult<std::result::Result<CompleteStorage, CompleteStorage>> {
let (data, outboard, _sizes) = match storage {
BaoFileStorage::Complete(c) => return Ok(Err(c)),
BaoFileStorage::IncompleteMem(storage) => {
let (data, outboard, sizes) = storage.into_parts();
(
MemOrFile::Mem(Bytes::from(data.into_parts().0)),
MemOrFile::Mem(Bytes::from(outboard.into_parts().0)),
MemOrFile::Mem(Bytes::from(sizes.to_vec())),
)
}
BaoFileStorage::IncompleteFile(storage) => {
let (data, outboard, sizes) = storage.into_parts();
(
MemOrFile::File(data),
MemOrFile::File(outboard),
MemOrFile::File(sizes),
)
}
};
let data_size = data.size()?.unwrap();
let outboard_size = outboard.size()?.unwrap();
debug_assert!(raw_outboard_size(data_size) == outboard_size);
let data = if data_size <= inline_options.max_data_inlined {
match data {
MemOrFile::File(data) => {
let mut buf = vec![0; data_size as usize];
data.read_at(0, &mut buf)?;
delete_after_commit.insert(*hash, [BaoFilePart::Data]);
MemOrFile::Mem(Bytes::from(buf))
}
MemOrFile::Mem(data) => MemOrFile::Mem(data),
}
} else {
delete_after_commit.remove(*hash, [BaoFilePart::Data]);
match data {
MemOrFile::Mem(data) => {
let path = path_options.owned_data_path(hash);
let file = overwrite_and_sync(&path, &data)?;
MemOrFile::File((file, data_size))
}
MemOrFile::File(data) => MemOrFile::File((data, data_size)),
}
};
let outboard = if outboard_size == 0 {
Default::default()
} else if outboard_size <= inline_options.max_outboard_inlined {
match outboard {
MemOrFile::File(outboard) => {
let mut buf = vec![0; outboard_size as usize];
outboard.read_at(0, &mut buf)?;
drop(outboard);
delete_after_commit.insert(*hash, [BaoFilePart::Outboard]);
MemOrFile::Mem(Bytes::from(buf))
}
MemOrFile::Mem(outboard) => MemOrFile::Mem(outboard),
}
} else {
delete_after_commit.remove(*hash, [BaoFilePart::Outboard]);
match outboard {
MemOrFile::Mem(outboard) => {
let path = path_options.owned_outboard_path(hash);
let file = overwrite_and_sync(&path, &outboard)?;
MemOrFile::File((file, outboard_size))
}
MemOrFile::File(outboard) => MemOrFile::File((outboard, outboard_size)),
}
};
delete_after_commit.insert(*hash, [BaoFilePart::Sizes]);
Ok(Ok(CompleteStorage { data, outboard }))
}