1#![allow(missing_docs)]
4
5use std::{
6 future::Future,
7 path::Path,
8 pin::Pin,
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13 task::{ready, Poll},
14};
15
16use anyhow::Result;
17use bytes::Bytes;
18use iroh::EndpointAddr;
19use iroh_blobs::{
20 api::blobs::{AddPathOptions, AddProgressItem, ExportMode, ExportOptions, ExportProgress},
21 Hash,
22};
23use n0_future::{FutureExt, Stream, StreamExt};
24
25use self::{
26 actor::RpcActor,
27 protocol::{
28 AddrInfoOptions, AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest,
29 AuthorGetDefaultRequest, AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest,
30 CloseRequest, CreateRequest, DelRequest, DocsProtocol, DropRequest,
31 GetDownloadPolicyRequest, GetExactRequest, GetManyRequest, GetSyncPeersRequest,
32 ImportRequest, LeaveRequest, ListRequest, OpenRequest, SetDownloadPolicyRequest,
33 SetHashRequest, SetRequest, ShareMode, ShareRequest, StartSyncRequest, StatusRequest,
34 SubscribeRequest,
35 },
36};
37use crate::{
38 actor::OpenState,
39 engine::{Engine, LiveEvent},
40 store::{DownloadPolicy, Query},
41 Author, AuthorId, Capability, CapabilityKind, DocTicket, Entry, NamespaceId, PeerIdBytes,
42};
43
44pub(crate) mod actor;
45pub mod protocol;
46
47pub type RpcError = serde_error::Error;
48pub type RpcResult<T> = std::result::Result<T, RpcError>;
49
50type Client = irpc::Client<DocsProtocol>;
51
52#[derive(Debug, Clone)]
54pub struct DocsApi {
55 pub(crate) inner: Client,
56}
57
58impl DocsApi {
59 pub fn spawn(engine: Arc<Engine>) -> Self {
61 RpcActor::spawn(engine)
62 }
63
64 #[cfg(feature = "rpc")]
66 pub fn connect(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Result<DocsApi> {
67 Ok(DocsApi {
68 inner: Client::quinn(endpoint, addr),
69 })
70 }
71
72 #[cfg(feature = "rpc")]
74 pub fn listen(
75 &self,
76 endpoint: quinn::Endpoint,
77 ) -> Result<n0_future::task::AbortOnDropHandle<()>> {
78 use anyhow::Context;
79 let local = self
80 .inner
81 .as_local()
82 .context("cannot listen on remote API")?;
83 let handler: irpc::rpc::Handler<DocsProtocol> = Arc::new(move |msg, _rx, tx| {
84 let local = local.clone();
85 Box::pin(async move {
86 match msg {
87 DocsProtocol::Open(msg) => local.send((msg, tx)).await,
88 DocsProtocol::Close(msg) => local.send((msg, tx)).await,
89 DocsProtocol::Status(msg) => local.send((msg, tx)).await,
90 DocsProtocol::List(msg) => local.send((msg, tx)).await,
91 DocsProtocol::Create(msg) => local.send((msg, tx)).await,
92 DocsProtocol::Drop(msg) => local.send((msg, tx)).await,
93 DocsProtocol::Import(msg) => local.send((msg, tx)).await,
94 DocsProtocol::Set(msg) => local.send((msg, tx)).await,
95 DocsProtocol::SetHash(msg) => local.send((msg, tx)).await,
96 DocsProtocol::Get(msg) => local.send((msg, tx)).await,
97 DocsProtocol::GetExact(msg) => local.send((msg, tx)).await,
98 DocsProtocol::Del(msg) => local.send((msg, tx)).await,
101 DocsProtocol::StartSync(msg) => local.send((msg, tx)).await,
102 DocsProtocol::Leave(msg) => local.send((msg, tx)).await,
103 DocsProtocol::Share(msg) => local.send((msg, tx)).await,
104 DocsProtocol::Subscribe(msg) => local.send((msg, tx)).await,
105 DocsProtocol::GetDownloadPolicy(msg) => local.send((msg, tx)).await,
106 DocsProtocol::SetDownloadPolicy(msg) => local.send((msg, tx)).await,
107 DocsProtocol::GetSyncPeers(msg) => local.send((msg, tx)).await,
108 DocsProtocol::AuthorList(msg) => local.send((msg, tx)).await,
109 DocsProtocol::AuthorCreate(msg) => local.send((msg, tx)).await,
110 DocsProtocol::AuthorGetDefault(msg) => local.send((msg, tx)).await,
111 DocsProtocol::AuthorSetDefault(msg) => local.send((msg, tx)).await,
112 DocsProtocol::AuthorImport(msg) => local.send((msg, tx)).await,
113 DocsProtocol::AuthorExport(msg) => local.send((msg, tx)).await,
114 DocsProtocol::AuthorDelete(msg) => local.send((msg, tx)).await,
115 }
116 })
117 });
118 let join_handle = n0_future::task::spawn(irpc::rpc::listen(endpoint, handler));
119 Ok(n0_future::task::AbortOnDropHandle::new(join_handle))
120 }
121
122 pub async fn author_create(&self) -> Result<AuthorId> {
129 let response = self.inner.rpc(AuthorCreateRequest).await??;
130 Ok(response.author_id)
131 }
132
133 pub async fn author_default(&self) -> Result<AuthorId> {
140 let response = self.inner.rpc(AuthorGetDefaultRequest).await??;
141 Ok(response.author_id)
142 }
143
144 pub async fn author_set_default(&self, author_id: AuthorId) -> Result<()> {
151 self.inner
152 .rpc(AuthorSetDefaultRequest { author_id })
153 .await??;
154 Ok(())
155 }
156
157 pub async fn author_list(&self) -> Result<impl Stream<Item = Result<AuthorId>>> {
161 let stream = self.inner.server_streaming(AuthorListRequest, 64).await?;
162 Ok(stream.into_stream().map(|res| match res {
163 Err(err) => Err(err.into()),
164 Ok(Err(err)) => Err(err.into()),
165 Ok(Ok(res)) => Ok(res.author_id),
166 }))
167 }
168
169 pub async fn author_export(&self, author: AuthorId) -> Result<Option<Author>> {
173 let response = self.inner.rpc(AuthorExportRequest { author }).await??;
174 Ok(response.author)
175 }
176
177 pub async fn author_import(&self, author: Author) -> Result<()> {
181 self.inner.rpc(AuthorImportRequest { author }).await??;
182 Ok(())
183 }
184
185 pub async fn author_delete(&self, author: AuthorId) -> Result<()> {
191 self.inner.rpc(AuthorDeleteRequest { author }).await??;
192 Ok(())
193 }
194
195 pub async fn create(&self) -> Result<Doc> {
197 let response = self.inner.rpc(CreateRequest).await??;
198 Ok(Doc::new(self.inner.clone(), response.id))
199 }
200
201 pub async fn drop_doc(&self, doc_id: NamespaceId) -> Result<()> {
207 self.inner.rpc(DropRequest { doc_id }).await??;
208 Ok(())
209 }
210
211 pub async fn import_namespace(&self, capability: Capability) -> Result<Doc> {
215 let response = self.inner.rpc(ImportRequest { capability }).await??;
216 Ok(Doc::new(self.inner.clone(), response.doc_id))
217 }
218
219 pub async fn import(&self, ticket: DocTicket) -> Result<Doc> {
221 let DocTicket { capability, nodes } = ticket;
222 let doc = self.import_namespace(capability).await?;
223 doc.start_sync(nodes).await?;
224 Ok(doc)
225 }
226
227 pub async fn import_and_subscribe(
234 &self,
235 ticket: DocTicket,
236 ) -> Result<(Doc, impl Stream<Item = Result<LiveEvent>>)> {
237 let DocTicket { capability, nodes } = ticket;
238 let response = self.inner.rpc(ImportRequest { capability }).await??;
239 let doc = Doc::new(self.inner.clone(), response.doc_id);
240 let events = doc.subscribe().await?;
241 doc.start_sync(nodes).await?;
242 Ok((doc, events))
243 }
244
245 pub async fn list(
247 &self,
248 ) -> Result<impl Stream<Item = Result<(NamespaceId, CapabilityKind)>> + Unpin + Send + 'static>
249 {
250 let stream = self.inner.server_streaming(ListRequest, 64).await?;
251 let stream = Box::pin(stream.into_stream());
252 Ok(stream.map(|res| match res {
253 Err(err) => Err(err.into()),
254 Ok(Err(err)) => Err(err.into()),
255 Ok(Ok(res)) => Ok((res.id, res.capability)),
256 }))
257 }
258
259 pub async fn open(&self, id: NamespaceId) -> Result<Option<Doc>> {
263 self.inner.rpc(OpenRequest { doc_id: id }).await??;
264 Ok(Some(Doc::new(self.inner.clone(), id)))
265 }
266}
267
268#[derive(Debug, Clone)]
270pub struct Doc {
271 inner: Client,
272 namespace_id: NamespaceId,
273 closed: Arc<AtomicBool>,
274}
275
276impl Doc {
277 fn new(inner: Client, namespace_id: NamespaceId) -> Self {
278 Self {
279 inner,
280 namespace_id,
281 closed: Default::default(),
282 }
283 }
284
285 pub fn id(&self) -> NamespaceId {
287 self.namespace_id
288 }
289
290 pub async fn close(&self) -> Result<()> {
292 self.closed.store(true, Ordering::Relaxed);
293 self.inner
294 .rpc(CloseRequest {
295 doc_id: self.namespace_id,
296 })
297 .await??;
298 Ok(())
299 }
300
301 fn ensure_open(&self) -> Result<()> {
302 if self.closed.load(Ordering::Relaxed) {
303 Err(anyhow::anyhow!("document is closed"))
304 } else {
305 Ok(())
306 }
307 }
308
309 pub async fn set_bytes(
311 &self,
312 author_id: AuthorId,
313 key: impl Into<Bytes>,
314 value: impl Into<Bytes>,
315 ) -> Result<Hash> {
316 self.ensure_open()?;
317 let response = self
318 .inner
319 .rpc(SetRequest {
320 doc_id: self.namespace_id,
321 author_id,
322 key: key.into(),
323 value: value.into(),
324 })
325 .await??;
326 Ok(response.entry.content_hash())
327 }
328
329 pub async fn set_hash(
331 &self,
332 author_id: AuthorId,
333 key: impl Into<Bytes>,
334 hash: Hash,
335 size: u64,
336 ) -> Result<()> {
337 self.ensure_open()?;
338 self.inner
339 .rpc(SetHashRequest {
340 doc_id: self.namespace_id,
341 author_id,
342 key: key.into(),
343 hash,
344 size,
345 })
346 .await??;
347 Ok(())
348 }
349
350 pub async fn del(&self, author_id: AuthorId, prefix: impl Into<Bytes>) -> Result<usize> {
357 self.ensure_open()?;
358 let response = self
359 .inner
360 .rpc(DelRequest {
361 doc_id: self.namespace_id,
362 author_id,
363 prefix: prefix.into(),
364 })
365 .await??;
366 Ok(response.removed)
367 }
368
369 pub async fn get_exact(
373 &self,
374 author: AuthorId,
375 key: impl AsRef<[u8]>,
376 include_empty: bool,
377 ) -> Result<Option<Entry>> {
378 self.ensure_open()?;
379 let response = self
380 .inner
381 .rpc(GetExactRequest {
382 author,
383 key: key.as_ref().to_vec().into(),
384 doc_id: self.namespace_id,
385 include_empty,
386 })
387 .await??;
388 Ok(response.entry.map(|entry| entry.into()))
389 }
390
391 pub async fn get_many(
393 &self,
394 query: impl Into<Query>,
395 ) -> Result<impl Stream<Item = Result<Entry>>> {
396 self.ensure_open()?;
397 let stream = self
398 .inner
399 .server_streaming(
400 GetManyRequest {
401 doc_id: self.namespace_id,
402 query: query.into(),
403 },
404 64,
405 )
406 .await?;
407 Ok(stream.into_stream().map(|res| match res {
408 Err(err) => Err(err.into()),
409 Ok(Err(err)) => Err(err.into()),
410 Ok(Ok(res)) => Ok(res.into()),
411 }))
412 }
413
414 pub async fn get_one(&self, query: impl Into<Query>) -> Result<Option<Entry>> {
416 self.ensure_open()?;
417 let stream = self.get_many(query).await?;
418 tokio::pin!(stream);
419 futures_lite::StreamExt::next(&mut stream).await.transpose()
420 }
421
422 pub async fn share(&self, mode: ShareMode, addr_options: AddrInfoOptions) -> Result<DocTicket> {
424 self.ensure_open()?;
425 let response = self
426 .inner
427 .rpc(ShareRequest {
428 doc_id: self.namespace_id,
429 mode,
430 addr_options,
431 })
432 .await??;
433 Ok(response.0)
434 }
435
436 pub async fn start_sync(&self, peers: Vec<EndpointAddr>) -> Result<()> {
438 self.ensure_open()?;
439 self.inner
440 .rpc(StartSyncRequest {
441 doc_id: self.namespace_id,
442 peers,
443 })
444 .await??;
445 Ok(())
446 }
447
448 pub async fn leave(&self) -> Result<()> {
450 self.ensure_open()?;
451 self.inner
452 .rpc(LeaveRequest {
453 doc_id: self.namespace_id,
454 })
455 .await??;
456 Ok(())
457 }
458
459 pub async fn subscribe(
461 &self,
462 ) -> Result<impl Stream<Item = Result<LiveEvent>> + Send + Unpin + 'static> {
463 self.ensure_open()?;
464 let stream = self
465 .inner
466 .server_streaming(
467 SubscribeRequest {
468 doc_id: self.namespace_id,
469 },
470 64,
471 )
472 .await?;
473 Ok(Box::pin(stream.into_stream().map(|res| match res {
474 Err(err) => Err(err.into()),
475 Ok(Err(err)) => Err(err.into()),
476 Ok(Ok(res)) => Ok(res.event),
477 })))
478 }
479
480 pub async fn status(&self) -> Result<OpenState> {
482 self.ensure_open()?;
483 let response = self
484 .inner
485 .rpc(StatusRequest {
486 doc_id: self.namespace_id,
487 })
488 .await??;
489 Ok(response.status)
490 }
491
492 pub async fn set_download_policy(&self, policy: DownloadPolicy) -> Result<()> {
494 self.ensure_open()?;
495 self.inner
496 .rpc(SetDownloadPolicyRequest {
497 doc_id: self.namespace_id,
498 policy,
499 })
500 .await??;
501 Ok(())
502 }
503
504 pub async fn get_download_policy(&self) -> Result<DownloadPolicy> {
506 self.ensure_open()?;
507 let response = self
508 .inner
509 .rpc(GetDownloadPolicyRequest {
510 doc_id: self.namespace_id,
511 })
512 .await??;
513 Ok(response.policy)
514 }
515
516 pub async fn get_sync_peers(&self) -> Result<Option<Vec<PeerIdBytes>>> {
518 self.ensure_open()?;
519 let response = self
520 .inner
521 .rpc(GetSyncPeersRequest {
522 doc_id: self.namespace_id,
523 })
524 .await??;
525 Ok(response.peers)
526 }
527
528 pub async fn import_file(
530 &self,
531 blobs: &iroh_blobs::api::Store,
532 author: AuthorId,
533 key: Bytes,
534 path: impl AsRef<Path>,
535 import_mode: iroh_blobs::api::blobs::ImportMode,
536 ) -> Result<ImportFileProgress> {
537 self.ensure_open()?;
538 let progress = blobs.add_path_with_opts(AddPathOptions {
539 path: path.as_ref().to_owned(),
540 format: iroh_blobs::BlobFormat::Raw,
541 mode: import_mode,
542 });
543 let stream = progress.stream().await;
544 let doc = self.clone();
545 let ctx = EntryContext {
546 doc,
547 author,
548 key,
549 size: None,
550 };
551 Ok(ImportFileProgress(ImportInner::Blobs(
552 Box::pin(stream),
553 Some(ctx),
554 )))
555 }
556
557 pub async fn export_file(
559 &self,
560 blobs: &iroh_blobs::api::Store,
561 entry: Entry,
562 path: impl AsRef<Path>,
563 mode: ExportMode,
564 ) -> Result<ExportProgress> {
565 self.ensure_open()?;
566 let hash = entry.content_hash();
567 let progress = blobs.export_with_opts(ExportOptions {
568 hash,
569 mode,
570 target: path.as_ref().to_path_buf(),
571 });
572 Ok(progress)
573 }
574}
575
576#[derive(Debug)]
577pub enum ImportFileProgressItem {
578 Error(anyhow::Error),
579 Blobs(AddProgressItem),
580 Done(ImportFileOutcome),
581}
582
583#[derive(Debug)]
584pub struct ImportFileProgress(ImportInner);
585
586#[derive(derive_more::Debug)]
587enum ImportInner {
588 #[debug("Blobs")]
589 Blobs(
590 n0_future::boxed::BoxStream<AddProgressItem>,
591 Option<EntryContext>,
592 ),
593 #[debug("Entry")]
594 Entry(n0_future::boxed::BoxFuture<Result<ImportFileOutcome>>),
595 Done,
596}
597
598struct EntryContext {
599 doc: Doc,
600 author: AuthorId,
601 key: Bytes,
602 size: Option<u64>,
603}
604
605impl Stream for ImportFileProgress {
606 type Item = ImportFileProgressItem;
607
608 fn poll_next(
609 self: Pin<&mut Self>,
610 cx: &mut std::task::Context<'_>,
611 ) -> Poll<Option<Self::Item>> {
612 let this = self.get_mut();
613 match this.0 {
614 ImportInner::Blobs(ref mut progress, ref mut context) => {
615 match ready!(progress.poll_next(cx)) {
616 Some(item) => match item {
617 AddProgressItem::Size(size) => {
618 context
619 .as_mut()
620 .expect("Size must be emitted before done")
621 .size = Some(size);
622 Poll::Ready(Some(ImportFileProgressItem::Blobs(AddProgressItem::Size(
623 size,
624 ))))
625 }
626 AddProgressItem::Error(err) => {
627 *this = Self(ImportInner::Done);
628 Poll::Ready(Some(ImportFileProgressItem::Error(err.into())))
629 }
630 AddProgressItem::Done(tag) => {
631 let EntryContext {
632 doc,
633 author,
634 key,
635 size,
636 } = context
637 .take()
638 .expect("AddProgressItem::Done may be emitted only once");
639 let size = size.expect("Size must be emitted before done");
640 let hash = tag.hash();
641 *this = Self(ImportInner::Entry(Box::pin(async move {
642 doc.set_hash(author, key.clone(), hash, size).await?;
643 Ok(ImportFileOutcome { hash, size, key })
644 })));
645 Poll::Ready(Some(ImportFileProgressItem::Blobs(AddProgressItem::Done(
646 tag,
647 ))))
648 }
649 item => Poll::Ready(Some(ImportFileProgressItem::Blobs(item))),
650 },
651 None => todo!(),
652 }
653 }
654 ImportInner::Entry(ref mut fut) => {
655 let res = ready!(fut.poll(cx));
656 *this = Self(ImportInner::Done);
657 match res {
658 Ok(outcome) => Poll::Ready(Some(ImportFileProgressItem::Done(outcome))),
659 Err(err) => Poll::Ready(Some(ImportFileProgressItem::Error(err))),
660 }
661 }
662 ImportInner::Done => Poll::Ready(None),
663 }
664 }
665}
666
667impl Future for ImportFileProgress {
668 type Output = Result<ImportFileOutcome>;
669 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
670 loop {
671 match self.as_mut().poll_next(cx) {
672 Poll::Ready(Some(item)) => match item {
673 ImportFileProgressItem::Error(error) => return Poll::Ready(Err(error)),
674 ImportFileProgressItem::Blobs(_add_progress_item) => continue,
675 ImportFileProgressItem::Done(outcome) => return Poll::Ready(Ok(outcome)),
676 },
677 Poll::Ready(None) => {
678 return Poll::Ready(Err(anyhow::anyhow!(
679 "ImportFileProgress polled after completion"
680 )))
681 }
682 Poll::Pending => return Poll::Pending,
683 }
684 }
685 }
686}
687
688#[derive(Debug, Clone, PartialEq, Eq)]
690pub struct ImportFileOutcome {
691 pub hash: Hash,
693 pub size: u64,
695 pub key: Bytes,
697}