use std::{
fmt,
fs::{self, File, OpenOptions},
io::{self, Seek, Write},
path::PathBuf,
sync::Arc,
};
use bao_tree::{
io::outboard::{PreOrderMemOutboard, PreOrderOutboard},
BaoTree, ChunkNum,
};
use bytes::Bytes;
use genawaiter::sync::Gen;
use irpc::{
channel::{mpsc, none::NoReceiver},
Channels, WithChannels,
};
use n0_future::{stream, Stream, StreamExt};
use ref_cast::RefCast;
use smallvec::SmallVec;
use tracing::{instrument, trace};
use super::{meta::raw_outboard_size, options::Options, TaskContext};
use crate::{
api::{
blobs::{AddProgressItem, ImportMode},
proto::{
HashSpecific, ImportByteStreamMsg, ImportByteStreamRequest, ImportByteStreamUpdate,
ImportBytesMsg, ImportBytesRequest, ImportPathMsg, ImportPathRequest, Request, Scope,
},
},
store::{
util::{MemOrFile, DD},
IROH_BLOCK_SIZE,
},
util::{outboard_with_progress::init_outboard, sink::Sink},
BlobFormat, Hash,
};
pub enum ImportSource {
TempFile(PathBuf, File, u64),
External(PathBuf, File, u64),
Memory(Bytes),
}
impl std::fmt::Debug for ImportSource {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::TempFile(path, _, size) => {
f.debug_tuple("TempFile").field(path).field(size).finish()
}
Self::External(path, _, size) => {
f.debug_tuple("External").field(path).field(size).finish()
}
Self::Memory(data) => f.debug_tuple("Memory").field(&data.len()).finish(),
}
}
}
impl ImportSource {
pub fn fmt_short(&self) -> String {
match self {
Self::TempFile(path, _, _) => format!("TempFile({})", path.display()),
Self::External(path, _, _) => format!("External({})", path.display()),
Self::Memory(data) => format!("Memory({})", data.len()),
}
}
fn is_mem(&self) -> bool {
matches!(self, Self::Memory(_))
}
fn read(&self) -> MemOrFile<std::io::Cursor<&[u8]>, &File> {
match self {
Self::TempFile(_, file, _) => MemOrFile::File(file),
Self::External(_, file, _) => MemOrFile::File(file),
Self::Memory(data) => MemOrFile::Mem(std::io::Cursor::new(data.as_ref())),
}
}
fn size(&self) -> u64 {
match self {
Self::TempFile(_, _, size) => *size,
Self::External(_, _, size) => *size,
Self::Memory(data) => data.len() as u64,
}
}
}
pub struct ImportEntry {
pub hash: Hash,
pub format: BlobFormat,
pub scope: Scope,
pub source: ImportSource,
pub outboard: MemOrFile<Bytes, PathBuf>,
}
impl std::fmt::Debug for ImportEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ImportEntry")
.field("hash", &self.hash)
.field("format", &self.format)
.field("scope", &self.scope)
.field("source", &DD(self.source.fmt_short()))
.field("outboard", &DD(self.outboard.fmt_short()))
.finish()
}
}
impl Channels<Request> for ImportEntry {
type Tx = mpsc::Sender<AddProgressItem>;
type Rx = NoReceiver;
}
pub type ImportEntryMsg = WithChannels<ImportEntry, Request>;
impl HashSpecific for ImportEntryMsg {
fn hash(&self) -> Hash {
self.hash
}
}
impl ImportEntry {
pub fn is_mem(&self) -> bool {
self.source.is_mem() && self.outboard.is_mem()
}
}
#[instrument(skip_all, fields(data = cmd.data.len()))]
pub async fn import_bytes(cmd: ImportBytesMsg, ctx: Arc<TaskContext>) {
let size = cmd.data.len() as u64;
if ctx.options.is_inlined_all(size) {
import_bytes_tiny_outer(cmd, ctx).await;
} else {
let request = ImportByteStreamRequest {
format: cmd.format,
scope: cmd.scope,
};
let stream = stream::iter(Some(Ok(cmd.data.clone())));
import_byte_stream_mid(request, cmd.tx, cmd.span, stream, ctx).await;
}
}
async fn import_bytes_tiny_outer(mut cmd: ImportBytesMsg, ctx: Arc<TaskContext>) {
match import_bytes_tiny_impl(cmd.inner, &mut cmd.tx).await {
Ok(entry) => {
let entry = ImportEntryMsg {
inner: entry,
tx: cmd.tx,
rx: cmd.rx,
span: cmd.span,
};
ctx.internal_cmd_tx.send(entry.into()).await.ok();
}
Err(cause) => {
cmd.tx.send(cause.into()).await.ok();
}
}
}
async fn import_bytes_tiny_impl(
cmd: ImportBytesRequest,
tx: &mut mpsc::Sender<AddProgressItem>,
) -> io::Result<ImportEntry> {
let size = cmd.data.len() as u64;
tx.send(AddProgressItem::Size(size))
.await
.map_err(|_e| io::Error::other("error"))?;
tx.send(AddProgressItem::CopyDone)
.await
.map_err(|_e| io::Error::other("error"))?;
Ok(if raw_outboard_size(size) == 0 {
ImportEntry {
hash: Hash::new(&cmd.data),
format: cmd.format,
scope: cmd.scope,
source: ImportSource::Memory(cmd.data),
outboard: MemOrFile::empty(),
}
} else {
let outboard = PreOrderMemOutboard::create(&cmd.data, IROH_BLOCK_SIZE);
ImportEntry {
hash: outboard.root.into(),
format: cmd.format,
scope: cmd.scope,
source: ImportSource::Memory(cmd.data),
outboard: MemOrFile::Mem(Bytes::from(outboard.data)),
}
})
}
#[instrument(skip_all)]
pub async fn import_byte_stream(cmd: ImportByteStreamMsg, ctx: Arc<TaskContext>) {
let stream = into_stream(cmd.rx);
import_byte_stream_mid(cmd.inner, cmd.tx, cmd.span, stream, ctx).await
}
fn into_stream(
mut rx: mpsc::Receiver<ImportByteStreamUpdate>,
) -> impl Stream<Item = io::Result<Bytes>> {
Gen::new(|co| async move {
loop {
match rx.recv().await {
Ok(Some(ImportByteStreamUpdate::Bytes(data))) => {
co.yield_(Ok(data)).await;
}
Ok(Some(ImportByteStreamUpdate::Done)) => {
break;
}
Ok(None) => {
co.yield_(Err(io::ErrorKind::UnexpectedEof.into())).await;
break;
}
Err(e) => {
co.yield_(Err(e.into())).await;
break;
}
}
}
})
}
async fn import_byte_stream_mid(
request: ImportByteStreamRequest,
mut tx: mpsc::Sender<AddProgressItem>,
span: tracing::Span,
stream: impl Stream<Item = io::Result<Bytes>> + Unpin,
ctx: Arc<TaskContext>,
) {
match import_byte_stream_impl(request, &mut tx, stream, ctx.options.clone()).await {
Ok(entry) => {
let entry = ImportEntryMsg {
inner: entry,
tx,
rx: NoReceiver,
span,
};
ctx.internal_cmd_tx.send(entry.into()).await.ok();
}
Err(cause) => {
tx.send(cause.into()).await.ok();
}
}
}
async fn import_byte_stream_impl(
cmd: ImportByteStreamRequest,
tx: &mut mpsc::Sender<AddProgressItem>,
stream: impl Stream<Item = io::Result<Bytes>> + Unpin,
options: Arc<Options>,
) -> io::Result<ImportEntry> {
let ImportByteStreamRequest { format, scope } = cmd;
let import_source = get_import_source(stream, tx, &options).await?;
tx.send(AddProgressItem::Size(import_source.size()))
.await
.map_err(|_e| io::Error::other("error"))?;
tx.send(AddProgressItem::CopyDone)
.await
.map_err(|_e| io::Error::other("error"))?;
compute_outboard(import_source, format, scope, options, tx).await
}
async fn get_import_source(
stream: impl Stream<Item = io::Result<Bytes>> + Unpin,
tx: &mut mpsc::Sender<AddProgressItem>,
options: &Options,
) -> io::Result<ImportSource> {
let mut stream = stream.fuse();
let mut peek = SmallVec::<[_; 2]>::new();
let Some(first) = stream.next().await.transpose()? else {
return Ok(ImportSource::Memory(Bytes::new()));
};
match stream.next().await.transpose()? {
Some(second) => {
peek.push(Ok(first));
peek.push(Ok(second));
}
None => {
let size = first.len() as u64;
if options.is_inlined_data(size) {
return Ok(ImportSource::Memory(first));
}
peek.push(Ok(first));
}
};
let mut stream = stream::iter(peek).chain(stream);
let mut size = 0;
let mut data = Vec::new();
let mut disk = None;
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
size += chunk.len() as u64;
if size > options.inline.max_data_inlined {
let temp_path = options.path.temp_file_name();
trace!("writing to temp file: {:?}", temp_path);
let mut file = fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&temp_path)?;
file.write_all(&data)?;
file.write_all(&chunk)?;
data.clear();
disk = Some((file, temp_path));
break;
} else {
data.extend_from_slice(&chunk);
}
tx.try_send(AddProgressItem::CopyProgress(size))
.await
.map_err(|_e| io::Error::other("error"))?;
}
Ok(if let Some((mut file, temp_path)) = disk {
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk)?;
size += chunk.len() as u64;
tx.send(AddProgressItem::CopyProgress(size))
.await
.map_err(|_e| io::Error::other("error"))?;
}
ImportSource::TempFile(temp_path, file, size)
} else {
ImportSource::Memory(data.into())
})
}
#[derive(ref_cast::RefCast)]
#[repr(transparent)]
struct OutboardProgress(mpsc::Sender<AddProgressItem>);
impl Sink<ChunkNum> for OutboardProgress {
type Error = irpc::channel::SendError;
async fn send(&mut self, offset: ChunkNum) -> std::result::Result<(), Self::Error> {
self.0
.try_send(AddProgressItem::OutboardProgress(offset.to_bytes()))
.await?;
Ok(())
}
}
async fn compute_outboard(
source: ImportSource,
format: BlobFormat,
scope: Scope,
options: Arc<Options>,
tx: &mut mpsc::Sender<AddProgressItem>,
) -> io::Result<ImportEntry> {
let size = source.size();
let tree = BaoTree::new(size, IROH_BLOCK_SIZE);
let root = bao_tree::blake3::Hash::from_bytes([0; 32]);
let outboard_size = raw_outboard_size(size);
let send_progress = OutboardProgress::ref_cast_mut(tx);
let mut data = source.read();
data.rewind()?;
let (hash, outboard) = if outboard_size > options.inline.max_outboard_inlined {
let outboard_path = options.path.temp_file_name();
trace!("Creating outboard file in {}", outboard_path.display());
let mut outboard_file = File::create(&outboard_path)?;
let mut outboard = PreOrderOutboard {
tree,
root,
data: &mut outboard_file,
};
init_outboard(data, &mut outboard, send_progress).await??;
(outboard.root, MemOrFile::File(outboard_path))
} else {
trace!("Creating outboard in memory");
let mut outboard_file: Vec<u8> = Vec::new();
let mut outboard = PreOrderOutboard {
tree,
root,
data: &mut outboard_file,
};
init_outboard(data, &mut outboard, send_progress).await??;
(outboard.root, MemOrFile::Mem(Bytes::from(outboard_file)))
};
Ok(ImportEntry {
hash: hash.into(),
format,
scope,
source,
outboard,
})
}
#[instrument(skip_all, fields(path = %cmd.path.display()))]
pub async fn import_path(mut cmd: ImportPathMsg, context: Arc<TaskContext>) {
match import_path_impl(cmd.inner, &mut cmd.tx, context.options.clone()).await {
Ok(inner) => {
let res = ImportEntryMsg {
inner,
tx: cmd.tx,
rx: cmd.rx,
span: cmd.span,
};
context.internal_cmd_tx.send(res.into()).await.ok();
}
Err(cause) => {
cmd.tx.send(cause.into()).await.ok();
}
}
}
async fn import_path_impl(
cmd: ImportPathRequest,
tx: &mut mpsc::Sender<AddProgressItem>,
options: Arc<Options>,
) -> io::Result<ImportEntry> {
let ImportPathRequest {
path,
mode,
format,
scope: batch,
} = cmd;
if !path.is_absolute() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"path must be absolute",
));
}
if !path.is_file() && !path.is_symlink() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"path is not a file or symlink",
));
}
let size = path.metadata()?.len();
tx.send(AddProgressItem::Size(size))
.await
.map_err(|_e| io::Error::other("error"))?;
let import_source = if size <= options.inline.max_data_inlined {
let data = std::fs::read(path)?;
tx.send(AddProgressItem::CopyDone)
.await
.map_err(|_e| io::Error::other("error"))?;
ImportSource::Memory(data.into())
} else if mode == ImportMode::TryReference {
let file = OpenOptions::new().read(true).open(&path)?;
ImportSource::External(path, file, size)
} else {
let temp_path = options.path.temp_file_name();
if reflink_copy::reflink_or_copy(&path, &temp_path)?.is_none() {
trace!("reflinked {} to {}", path.display(), temp_path.display());
} else {
trace!("copied {} to {}", path.display(), temp_path.display());
}
let file = OpenOptions::new().read(true).open(&temp_path)?;
tx.send(AddProgressItem::CopyDone)
.await
.map_err(|_| io::Error::other("error"))?;
ImportSource::TempFile(temp_path, file, size)
};
compute_outboard(import_source, format, batch, options, tx).await
}
#[cfg(test)]
mod tests {
use bao_tree::io::outboard::PreOrderMemOutboard;
use irpc::RpcMessage;
use n0_future::stream;
use testresult::TestResult;
use super::*;
use crate::{
api::proto::BoxedByteStream,
store::fs::options::{InlineOptions, PathOptions},
};
async fn drain<T: RpcMessage>(mut recv: mpsc::Receiver<T>) -> TestResult<Vec<T>> {
let mut res = Vec::new();
while let Some(item) = recv.recv().await? {
res.push(item);
}
Ok(res)
}
fn assert_expected_progress(progress: &[AddProgressItem]) {
assert!(progress
.iter()
.any(|x| matches!(&x, AddProgressItem::Size { .. })));
assert!(progress
.iter()
.any(|x| matches!(&x, AddProgressItem::CopyDone)));
}
fn chunk_bytes(data: Bytes, chunk_size: usize) -> impl Iterator<Item = Bytes> {
assert!(chunk_size > 0, "Chunk size must be positive");
(0..data.len())
.step_by(chunk_size)
.map(move |i| data.slice(i..std::cmp::min(i + chunk_size, data.len())))
}
async fn test_import_byte_stream_task(data: Bytes, options: Arc<Options>) -> TestResult<()> {
let stream: BoxedByteStream =
Box::pin(stream::iter(chunk_bytes(data.clone(), 999).map(Ok)));
let expected_outboard = PreOrderMemOutboard::create(data.as_ref(), IROH_BLOCK_SIZE);
let (mut tx, rx) = mpsc::channel(1024 * 1024);
let data = stream.collect::<Vec<_>>().await;
let data = data.into_iter().collect::<io::Result<Vec<_>>>()?;
let cmd = ImportByteStreamRequest {
format: BlobFormat::Raw,
scope: Default::default(),
};
let stream = stream::iter(data.into_iter().map(Ok));
let res = import_byte_stream_impl(cmd, &mut tx, stream, options).await;
let Ok(res) = res else {
panic!("import failed");
};
let ImportEntry { outboard, .. } = res;
drop(tx);
let actual_outboard = match &outboard {
MemOrFile::Mem(data) => data.clone(),
MemOrFile::File(path) => std::fs::read(path)?.into(),
};
assert_eq!(expected_outboard.data.as_slice(), actual_outboard.as_ref());
let progress = drain(rx).await?;
assert_expected_progress(&progress);
Ok(())
}
async fn test_import_file_task(data: Bytes, options: Arc<Options>) -> TestResult<()> {
let path = options.path.temp_file_name();
std::fs::write(&path, &data)?;
let expected_outboard = PreOrderMemOutboard::create(data.as_ref(), IROH_BLOCK_SIZE);
let (mut tx, rx) = mpsc::channel(1024 * 1024);
let cmd = ImportPathRequest {
path,
mode: ImportMode::Copy,
format: BlobFormat::Raw,
scope: Scope::default(),
};
let res = import_path_impl(cmd, &mut tx, options).await;
let Ok(res) = res else {
panic!("import failed");
};
let ImportEntry { outboard, .. } = res;
drop(tx);
let actual_outboard = match &outboard {
MemOrFile::Mem(data) => data.clone(),
MemOrFile::File(path) => std::fs::read(path)?.into(),
};
assert_eq!(expected_outboard.data.as_slice(), actual_outboard.as_ref());
let progress = drain(rx).await?;
assert_expected_progress(&progress);
Ok(())
}
#[tokio::test]
async fn smoke() -> TestResult<()> {
let dir = tempfile::tempdir()?;
std::fs::create_dir_all(dir.path().join("data"))?;
std::fs::create_dir_all(dir.path().join("temp"))?;
let options = Arc::new(Options {
inline: InlineOptions {
max_data_inlined: 1024 * 16,
max_outboard_inlined: 1024 * 16,
},
batch: Default::default(),
path: PathOptions::new(dir.path()),
gc: None,
});
let sizes = [
0, 1024, 1024 * 16 - 1, 1024 * 16, 1024 * 16 + 1, 1024 * 1024, 1024 * 1024 * 8, ];
for size in sizes {
let data = Bytes::from(vec![0; size]);
test_import_byte_stream_task(data.clone(), options.clone()).await?;
test_import_file_task(data, options.clone()).await?;
}
Ok(())
}
}