1#![allow(missing_docs)]
4
5use std::{
6 future::Future,
7 net::SocketAddr,
8 path::Path,
9 pin::Pin,
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc,
13 },
14 task::{ready, Poll},
15};
16
17use anyhow::{Context, Result};
18use bytes::Bytes;
19use iroh::NodeAddr;
20use iroh_blobs::{
21 api::blobs::{AddPathOptions, AddProgressItem, ExportMode, ExportOptions, ExportProgress},
22 Hash,
23};
24use irpc::rpc::Handler;
25use n0_future::{
26 task::{self, AbortOnDropHandle},
27 FutureExt, Stream, StreamExt,
28};
29
30use self::{
31 actor::RpcActor,
32 protocol::{
33 AddrInfoOptions, AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest,
34 AuthorGetDefaultRequest, AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest,
35 CloseRequest, CreateRequest, DelRequest, DocsProtocol, DropRequest,
36 GetDownloadPolicyRequest, GetExactRequest, GetManyRequest, GetSyncPeersRequest,
37 ImportRequest, LeaveRequest, ListRequest, OpenRequest, SetDownloadPolicyRequest,
38 SetHashRequest, SetRequest, ShareMode, ShareRequest, StartSyncRequest, StatusRequest,
39 SubscribeRequest,
40 },
41};
42use crate::{
43 actor::OpenState,
44 engine::{Engine, LiveEvent},
45 store::{DownloadPolicy, Query},
46 Author, AuthorId, Capability, CapabilityKind, DocTicket, Entry, NamespaceId, PeerIdBytes,
47};
48
49pub(crate) mod actor;
50pub mod protocol;
51
52pub type RpcError = serde_error::Error;
53pub type RpcResult<T> = std::result::Result<T, RpcError>;
54
55type Client = irpc::Client<DocsProtocol>;
56
57#[derive(Debug, Clone)]
59pub struct DocsApi {
60 pub(crate) inner: Client,
61}
62
63impl DocsApi {
64 pub fn spawn(engine: Arc<Engine>) -> Self {
66 RpcActor::spawn(engine)
67 }
68
69 pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Result<DocsApi> {
71 Ok(DocsApi {
72 inner: Client::quinn(endpoint, addr),
73 })
74 }
75
76 pub fn listen(&self, endpoint: quinn::Endpoint) -> Result<AbortOnDropHandle<()>> {
78 let local = self
79 .inner
80 .as_local()
81 .context("cannot listen on remote API")?;
82 let handler: Handler<DocsProtocol> = Arc::new(move |msg, _rx, tx| {
83 let local = local.clone();
84 Box::pin(match msg {
85 DocsProtocol::Open(msg) => local.send((msg, tx)),
86 DocsProtocol::Close(msg) => local.send((msg, tx)),
87 DocsProtocol::Status(msg) => local.send((msg, tx)),
88 DocsProtocol::List(msg) => local.send((msg, tx)),
89 DocsProtocol::Create(msg) => local.send((msg, tx)),
90 DocsProtocol::Drop(msg) => local.send((msg, tx)),
91 DocsProtocol::Import(msg) => local.send((msg, tx)),
92 DocsProtocol::Set(msg) => local.send((msg, tx)),
93 DocsProtocol::SetHash(msg) => local.send((msg, tx)),
94 DocsProtocol::Get(msg) => local.send((msg, tx)),
95 DocsProtocol::GetExact(msg) => local.send((msg, tx)),
96 DocsProtocol::Del(msg) => local.send((msg, tx)),
99 DocsProtocol::StartSync(msg) => local.send((msg, tx)),
100 DocsProtocol::Leave(msg) => local.send((msg, tx)),
101 DocsProtocol::Share(msg) => local.send((msg, tx)),
102 DocsProtocol::Subscribe(msg) => local.send((msg, tx)),
103 DocsProtocol::GetDownloadPolicy(msg) => local.send((msg, tx)),
104 DocsProtocol::SetDownloadPolicy(msg) => local.send((msg, tx)),
105 DocsProtocol::GetSyncPeers(msg) => local.send((msg, tx)),
106 DocsProtocol::AuthorList(msg) => local.send((msg, tx)),
107 DocsProtocol::AuthorCreate(msg) => local.send((msg, tx)),
108 DocsProtocol::AuthorGetDefault(msg) => local.send((msg, tx)),
109 DocsProtocol::AuthorSetDefault(msg) => local.send((msg, tx)),
110 DocsProtocol::AuthorImport(msg) => local.send((msg, tx)),
111 DocsProtocol::AuthorExport(msg) => local.send((msg, tx)),
112 DocsProtocol::AuthorDelete(msg) => local.send((msg, tx)),
113 })
114 });
115 let join_handle = task::spawn(irpc::rpc::listen(endpoint, handler));
116 Ok(AbortOnDropHandle::new(join_handle))
117 }
118
119 pub async fn author_create(&self) -> Result<AuthorId> {
126 let response = self.inner.rpc(AuthorCreateRequest).await??;
127 Ok(response.author_id)
128 }
129
130 pub async fn author_default(&self) -> Result<AuthorId> {
137 let response = self.inner.rpc(AuthorGetDefaultRequest).await??;
138 Ok(response.author_id)
139 }
140
141 pub async fn author_set_default(&self, author_id: AuthorId) -> Result<()> {
148 self.inner
149 .rpc(AuthorSetDefaultRequest { author_id })
150 .await??;
151 Ok(())
152 }
153
154 pub async fn author_list(&self) -> Result<impl Stream<Item = Result<AuthorId>>> {
158 let stream = self.inner.server_streaming(AuthorListRequest, 64).await?;
159 Ok(stream.into_stream().map(|res| match res {
160 Err(err) => Err(err.into()),
161 Ok(Err(err)) => Err(err.into()),
162 Ok(Ok(res)) => Ok(res.author_id),
163 }))
164 }
165
166 pub async fn author_export(&self, author: AuthorId) -> Result<Option<Author>> {
170 let response = self.inner.rpc(AuthorExportRequest { author }).await??;
171 Ok(response.author)
172 }
173
174 pub async fn author_import(&self, author: Author) -> Result<()> {
178 self.inner.rpc(AuthorImportRequest { author }).await??;
179 Ok(())
180 }
181
182 pub async fn author_delete(&self, author: AuthorId) -> Result<()> {
188 self.inner.rpc(AuthorDeleteRequest { author }).await??;
189 Ok(())
190 }
191
192 pub async fn create(&self) -> Result<Doc> {
194 let response = self.inner.rpc(CreateRequest).await??;
195 Ok(Doc::new(self.inner.clone(), response.id))
196 }
197
198 pub async fn drop_doc(&self, doc_id: NamespaceId) -> Result<()> {
204 self.inner.rpc(DropRequest { doc_id }).await??;
205 Ok(())
206 }
207
208 pub async fn import_namespace(&self, capability: Capability) -> Result<Doc> {
212 let response = self.inner.rpc(ImportRequest { capability }).await??;
213 Ok(Doc::new(self.inner.clone(), response.doc_id))
214 }
215
216 pub async fn import(&self, ticket: DocTicket) -> Result<Doc> {
218 let DocTicket { capability, nodes } = ticket;
219 let doc = self.import_namespace(capability).await?;
220 doc.start_sync(nodes).await?;
221 Ok(doc)
222 }
223
224 pub async fn import_and_subscribe(
231 &self,
232 ticket: DocTicket,
233 ) -> Result<(Doc, impl Stream<Item = Result<LiveEvent>>)> {
234 let DocTicket { capability, nodes } = ticket;
235 let response = self.inner.rpc(ImportRequest { capability }).await??;
236 let doc = Doc::new(self.inner.clone(), response.doc_id);
237 let events = doc.subscribe().await?;
238 doc.start_sync(nodes).await?;
239 Ok((doc, events))
240 }
241
242 pub async fn list(
244 &self,
245 ) -> Result<impl Stream<Item = Result<(NamespaceId, CapabilityKind)>> + Unpin + Send + 'static>
246 {
247 let stream = self.inner.server_streaming(ListRequest, 64).await?;
248 let stream = Box::pin(stream.into_stream());
249 Ok(stream.map(|res| match res {
250 Err(err) => Err(err.into()),
251 Ok(Err(err)) => Err(err.into()),
252 Ok(Ok(res)) => Ok((res.id, res.capability)),
253 }))
254 }
255
256 pub async fn open(&self, id: NamespaceId) -> Result<Option<Doc>> {
260 self.inner.rpc(OpenRequest { doc_id: id }).await??;
261 Ok(Some(Doc::new(self.inner.clone(), id)))
262 }
263}
264
265#[derive(Debug, Clone)]
267pub struct Doc {
268 inner: Client,
269 namespace_id: NamespaceId,
270 closed: Arc<AtomicBool>,
271}
272
273impl Doc {
274 fn new(inner: Client, namespace_id: NamespaceId) -> Self {
275 Self {
276 inner,
277 namespace_id,
278 closed: Default::default(),
279 }
280 }
281
282 pub fn id(&self) -> NamespaceId {
284 self.namespace_id
285 }
286
287 pub async fn close(&self) -> Result<()> {
289 self.closed.store(true, Ordering::Relaxed);
290 self.inner
291 .rpc(CloseRequest {
292 doc_id: self.namespace_id,
293 })
294 .await??;
295 Ok(())
296 }
297
298 fn ensure_open(&self) -> Result<()> {
299 if self.closed.load(Ordering::Relaxed) {
300 Err(anyhow::anyhow!("document is closed"))
301 } else {
302 Ok(())
303 }
304 }
305
306 pub async fn set_bytes(
308 &self,
309 author_id: AuthorId,
310 key: impl Into<Bytes>,
311 value: impl Into<Bytes>,
312 ) -> Result<Hash> {
313 self.ensure_open()?;
314 let response = self
315 .inner
316 .rpc(SetRequest {
317 doc_id: self.namespace_id,
318 author_id,
319 key: key.into(),
320 value: value.into(),
321 })
322 .await??;
323 Ok(response.entry.content_hash())
324 }
325
326 pub async fn set_hash(
328 &self,
329 author_id: AuthorId,
330 key: impl Into<Bytes>,
331 hash: Hash,
332 size: u64,
333 ) -> Result<()> {
334 self.ensure_open()?;
335 self.inner
336 .rpc(SetHashRequest {
337 doc_id: self.namespace_id,
338 author_id,
339 key: key.into(),
340 hash,
341 size,
342 })
343 .await??;
344 Ok(())
345 }
346
347 pub async fn del(&self, author_id: AuthorId, prefix: impl Into<Bytes>) -> Result<usize> {
354 self.ensure_open()?;
355 let response = self
356 .inner
357 .rpc(DelRequest {
358 doc_id: self.namespace_id,
359 author_id,
360 prefix: prefix.into(),
361 })
362 .await??;
363 Ok(response.removed)
364 }
365
366 pub async fn get_exact(
370 &self,
371 author: AuthorId,
372 key: impl AsRef<[u8]>,
373 include_empty: bool,
374 ) -> Result<Option<Entry>> {
375 self.ensure_open()?;
376 let response = self
377 .inner
378 .rpc(GetExactRequest {
379 author,
380 key: key.as_ref().to_vec().into(),
381 doc_id: self.namespace_id,
382 include_empty,
383 })
384 .await??;
385 Ok(response.entry.map(|entry| entry.into()))
386 }
387
388 pub async fn get_many(
390 &self,
391 query: impl Into<Query>,
392 ) -> Result<impl Stream<Item = Result<Entry>>> {
393 self.ensure_open()?;
394 let stream = self
395 .inner
396 .server_streaming(
397 GetManyRequest {
398 doc_id: self.namespace_id,
399 query: query.into(),
400 },
401 64,
402 )
403 .await?;
404 Ok(stream.into_stream().map(|res| match res {
405 Err(err) => Err(err.into()),
406 Ok(Err(err)) => Err(err.into()),
407 Ok(Ok(res)) => Ok(res.into()),
408 }))
409 }
410
411 pub async fn get_one(&self, query: impl Into<Query>) -> Result<Option<Entry>> {
413 self.ensure_open()?;
414 let stream = self.get_many(query).await?;
415 tokio::pin!(stream);
416 futures_lite::StreamExt::next(&mut stream).await.transpose()
417 }
418
419 pub async fn share(&self, mode: ShareMode, addr_options: AddrInfoOptions) -> Result<DocTicket> {
421 self.ensure_open()?;
422 let response = self
423 .inner
424 .rpc(ShareRequest {
425 doc_id: self.namespace_id,
426 mode,
427 addr_options,
428 })
429 .await??;
430 Ok(response.0)
431 }
432
433 pub async fn start_sync(&self, peers: Vec<NodeAddr>) -> Result<()> {
435 self.ensure_open()?;
436 self.inner
437 .rpc(StartSyncRequest {
438 doc_id: self.namespace_id,
439 peers,
440 })
441 .await??;
442 Ok(())
443 }
444
445 pub async fn leave(&self) -> Result<()> {
447 self.ensure_open()?;
448 self.inner
449 .rpc(LeaveRequest {
450 doc_id: self.namespace_id,
451 })
452 .await??;
453 Ok(())
454 }
455
456 pub async fn subscribe(
458 &self,
459 ) -> Result<impl Stream<Item = Result<LiveEvent>> + Send + Unpin + 'static> {
460 self.ensure_open()?;
461 let stream = self
462 .inner
463 .server_streaming(
464 SubscribeRequest {
465 doc_id: self.namespace_id,
466 },
467 64,
468 )
469 .await?;
470 Ok(Box::pin(stream.into_stream().map(|res| match res {
471 Err(err) => Err(err.into()),
472 Ok(Err(err)) => Err(err.into()),
473 Ok(Ok(res)) => Ok(res.event),
474 })))
475 }
476
477 pub async fn status(&self) -> Result<OpenState> {
479 self.ensure_open()?;
480 let response = self
481 .inner
482 .rpc(StatusRequest {
483 doc_id: self.namespace_id,
484 })
485 .await??;
486 Ok(response.status)
487 }
488
489 pub async fn set_download_policy(&self, policy: DownloadPolicy) -> Result<()> {
491 self.ensure_open()?;
492 self.inner
493 .rpc(SetDownloadPolicyRequest {
494 doc_id: self.namespace_id,
495 policy,
496 })
497 .await??;
498 Ok(())
499 }
500
501 pub async fn get_download_policy(&self) -> Result<DownloadPolicy> {
503 self.ensure_open()?;
504 let response = self
505 .inner
506 .rpc(GetDownloadPolicyRequest {
507 doc_id: self.namespace_id,
508 })
509 .await??;
510 Ok(response.policy)
511 }
512
513 pub async fn get_sync_peers(&self) -> Result<Option<Vec<PeerIdBytes>>> {
515 self.ensure_open()?;
516 let response = self
517 .inner
518 .rpc(GetSyncPeersRequest {
519 doc_id: self.namespace_id,
520 })
521 .await??;
522 Ok(response.peers)
523 }
524
525 pub async fn import_file(
527 &self,
528 blobs: &iroh_blobs::api::Store,
529 author: AuthorId,
530 key: Bytes,
531 path: impl AsRef<Path>,
532 import_mode: iroh_blobs::api::blobs::ImportMode,
533 ) -> Result<ImportFileProgress> {
534 self.ensure_open()?;
535 let progress = blobs.add_path_with_opts(AddPathOptions {
536 path: path.as_ref().to_owned(),
537 format: iroh_blobs::BlobFormat::Raw,
538 mode: import_mode,
539 });
540 let stream = progress.stream().await;
541 let doc = self.clone();
542 let ctx = EntryContext {
543 doc,
544 author,
545 key,
546 size: None,
547 };
548 Ok(ImportFileProgress(ImportInner::Blobs(
549 Box::pin(stream),
550 Some(ctx),
551 )))
552 }
553
554 pub async fn export_file(
556 &self,
557 blobs: &iroh_blobs::api::Store,
558 entry: Entry,
559 path: impl AsRef<Path>,
560 mode: ExportMode,
561 ) -> Result<ExportProgress> {
562 self.ensure_open()?;
563 let hash = entry.content_hash();
564 let progress = blobs.export_with_opts(ExportOptions {
565 hash,
566 mode,
567 target: path.as_ref().to_path_buf(),
568 });
569 Ok(progress)
570 }
571}
572
573#[derive(Debug)]
574pub enum ImportFileProgressItem {
575 Error(anyhow::Error),
576 Blobs(AddProgressItem),
577 Done(ImportFileOutcome),
578}
579
580#[derive(Debug)]
581pub struct ImportFileProgress(ImportInner);
582
583#[derive(derive_more::Debug)]
584enum ImportInner {
585 #[debug("Blobs")]
586 Blobs(
587 n0_future::boxed::BoxStream<AddProgressItem>,
588 Option<EntryContext>,
589 ),
590 #[debug("Entry")]
591 Entry(n0_future::boxed::BoxFuture<Result<ImportFileOutcome>>),
592 Done,
593}
594
595struct EntryContext {
596 doc: Doc,
597 author: AuthorId,
598 key: Bytes,
599 size: Option<u64>,
600}
601
602impl Stream for ImportFileProgress {
603 type Item = ImportFileProgressItem;
604
605 fn poll_next(
606 self: Pin<&mut Self>,
607 cx: &mut std::task::Context<'_>,
608 ) -> Poll<Option<Self::Item>> {
609 let this = self.get_mut();
610 match this.0 {
611 ImportInner::Blobs(ref mut progress, ref mut context) => {
612 match ready!(progress.poll_next(cx)) {
613 Some(item) => match item {
614 AddProgressItem::Size(size) => {
615 context
616 .as_mut()
617 .expect("Size must be emitted before done")
618 .size = Some(size);
619 Poll::Ready(Some(ImportFileProgressItem::Blobs(AddProgressItem::Size(
620 size,
621 ))))
622 }
623 AddProgressItem::Error(err) => {
624 *this = Self(ImportInner::Done);
625 Poll::Ready(Some(ImportFileProgressItem::Error(err.into())))
626 }
627 AddProgressItem::Done(tag) => {
628 let EntryContext {
629 doc,
630 author,
631 key,
632 size,
633 } = context
634 .take()
635 .expect("AddProgressItem::Done may be emitted only once");
636 let size = size.expect("Size must be emitted before done");
637 let hash = *tag.hash();
638 *this = Self(ImportInner::Entry(Box::pin(async move {
639 doc.set_hash(author, key.clone(), hash, size).await?;
640 Ok(ImportFileOutcome { hash, size, key })
641 })));
642 Poll::Ready(Some(ImportFileProgressItem::Blobs(AddProgressItem::Done(
643 tag,
644 ))))
645 }
646 item => Poll::Ready(Some(ImportFileProgressItem::Blobs(item))),
647 },
648 None => todo!(),
649 }
650 }
651 ImportInner::Entry(ref mut fut) => {
652 let res = ready!(fut.poll(cx));
653 *this = Self(ImportInner::Done);
654 match res {
655 Ok(outcome) => Poll::Ready(Some(ImportFileProgressItem::Done(outcome))),
656 Err(err) => Poll::Ready(Some(ImportFileProgressItem::Error(err))),
657 }
658 }
659 ImportInner::Done => Poll::Ready(None),
660 }
661 }
662}
663
664impl Future for ImportFileProgress {
665 type Output = Result<ImportFileOutcome>;
666 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
667 loop {
668 match self.as_mut().poll_next(cx) {
669 Poll::Ready(Some(item)) => match item {
670 ImportFileProgressItem::Error(error) => return Poll::Ready(Err(error)),
671 ImportFileProgressItem::Blobs(_add_progress_item) => continue,
672 ImportFileProgressItem::Done(outcome) => return Poll::Ready(Ok(outcome)),
673 },
674 Poll::Ready(None) => {
675 return Poll::Ready(Err(anyhow::anyhow!(
676 "ImportFileProgress polled after completion"
677 )))
678 }
679 Poll::Pending => return Poll::Pending,
680 }
681 }
682 }
683}
684
685#[derive(Debug, Clone, PartialEq, Eq)]
687pub struct ImportFileOutcome {
688 pub hash: Hash,
690 pub size: u64,
692 pub key: Bytes,
694}