iroh_docs/
api.rs

1//! irpc-based RPC implementation for docs.
2
3#![allow(missing_docs)]
4
5use std::{
6    future::Future,
7    net::SocketAddr,
8    path::Path,
9    pin::Pin,
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Arc,
13    },
14    task::{ready, Poll},
15};
16
17use anyhow::{Context, Result};
18use bytes::Bytes;
19use iroh::NodeAddr;
20use iroh_blobs::{
21    api::blobs::{AddPathOptions, AddProgressItem, ExportMode, ExportOptions, ExportProgress},
22    Hash,
23};
24use irpc::rpc::Handler;
25use n0_future::{
26    task::{self, AbortOnDropHandle},
27    FutureExt, Stream, StreamExt,
28};
29
30use self::{
31    actor::RpcActor,
32    protocol::{
33        AddrInfoOptions, AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest,
34        AuthorGetDefaultRequest, AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest,
35        CloseRequest, CreateRequest, DelRequest, DocsProtocol, DropRequest,
36        GetDownloadPolicyRequest, GetExactRequest, GetManyRequest, GetSyncPeersRequest,
37        ImportRequest, LeaveRequest, ListRequest, OpenRequest, SetDownloadPolicyRequest,
38        SetHashRequest, SetRequest, ShareMode, ShareRequest, StartSyncRequest, StatusRequest,
39        SubscribeRequest,
40    },
41};
42use crate::{
43    actor::OpenState,
44    engine::{Engine, LiveEvent},
45    store::{DownloadPolicy, Query},
46    Author, AuthorId, Capability, CapabilityKind, DocTicket, Entry, NamespaceId, PeerIdBytes,
47};
48
49pub(crate) mod actor;
50pub mod protocol;
51
52pub type RpcError = serde_error::Error;
53pub type RpcResult<T> = std::result::Result<T, RpcError>;
54
55type Client = irpc::Client<DocsProtocol>;
56
57/// API wrapper for the docs service
58#[derive(Debug, Clone)]
59pub struct DocsApi {
60    pub(crate) inner: Client,
61}
62
63impl DocsApi {
64    /// Create a new docs API from an engine
65    pub fn spawn(engine: Arc<Engine>) -> Self {
66        RpcActor::spawn(engine)
67    }
68
69    /// Connect to a remote docs service
70    pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Result<DocsApi> {
71        Ok(DocsApi {
72            inner: Client::quinn(endpoint, addr),
73        })
74    }
75
76    /// Listen for incoming RPC connections
77    pub fn listen(&self, endpoint: quinn::Endpoint) -> Result<AbortOnDropHandle<()>> {
78        let local = self
79            .inner
80            .as_local()
81            .context("cannot listen on remote API")?;
82        let handler: Handler<DocsProtocol> = Arc::new(move |msg, _rx, tx| {
83            let local = local.clone();
84            Box::pin(match msg {
85                DocsProtocol::Open(msg) => local.send((msg, tx)),
86                DocsProtocol::Close(msg) => local.send((msg, tx)),
87                DocsProtocol::Status(msg) => local.send((msg, tx)),
88                DocsProtocol::List(msg) => local.send((msg, tx)),
89                DocsProtocol::Create(msg) => local.send((msg, tx)),
90                DocsProtocol::Drop(msg) => local.send((msg, tx)),
91                DocsProtocol::Import(msg) => local.send((msg, tx)),
92                DocsProtocol::Set(msg) => local.send((msg, tx)),
93                DocsProtocol::SetHash(msg) => local.send((msg, tx)),
94                DocsProtocol::Get(msg) => local.send((msg, tx)),
95                DocsProtocol::GetExact(msg) => local.send((msg, tx)),
96                // DocsProtocol::ImportFile(msg) => local.send((msg, tx)),
97                // DocsProtocol::ExportFile(msg) => local.send((msg, tx)),
98                DocsProtocol::Del(msg) => local.send((msg, tx)),
99                DocsProtocol::StartSync(msg) => local.send((msg, tx)),
100                DocsProtocol::Leave(msg) => local.send((msg, tx)),
101                DocsProtocol::Share(msg) => local.send((msg, tx)),
102                DocsProtocol::Subscribe(msg) => local.send((msg, tx)),
103                DocsProtocol::GetDownloadPolicy(msg) => local.send((msg, tx)),
104                DocsProtocol::SetDownloadPolicy(msg) => local.send((msg, tx)),
105                DocsProtocol::GetSyncPeers(msg) => local.send((msg, tx)),
106                DocsProtocol::AuthorList(msg) => local.send((msg, tx)),
107                DocsProtocol::AuthorCreate(msg) => local.send((msg, tx)),
108                DocsProtocol::AuthorGetDefault(msg) => local.send((msg, tx)),
109                DocsProtocol::AuthorSetDefault(msg) => local.send((msg, tx)),
110                DocsProtocol::AuthorImport(msg) => local.send((msg, tx)),
111                DocsProtocol::AuthorExport(msg) => local.send((msg, tx)),
112                DocsProtocol::AuthorDelete(msg) => local.send((msg, tx)),
113            })
114        });
115        let join_handle = task::spawn(irpc::rpc::listen(endpoint, handler));
116        Ok(AbortOnDropHandle::new(join_handle))
117    }
118
119    /// Creates a new document author.
120    ///
121    /// You likely want to save the returned [`AuthorId`] somewhere so that you can use this author
122    /// again.
123    ///
124    /// If you need only a single author, use [`Self::author_default`].
125    pub async fn author_create(&self) -> Result<AuthorId> {
126        let response = self.inner.rpc(AuthorCreateRequest).await??;
127        Ok(response.author_id)
128    }
129
130    /// Returns the default document author of this node.
131    ///
132    /// On persistent nodes, the author is created on first start and its public key is saved
133    /// in the data directory.
134    ///
135    /// The default author can be set with [`Self::author_set_default`].
136    pub async fn author_default(&self) -> Result<AuthorId> {
137        let response = self.inner.rpc(AuthorGetDefaultRequest).await??;
138        Ok(response.author_id)
139    }
140
141    /// Sets the node-wide default author.
142    ///
143    /// If the author does not exist, an error is returned.
144    ///
145    /// On a persistent node, the author id will be saved to a file in the data directory and
146    /// reloaded after a restart.
147    pub async fn author_set_default(&self, author_id: AuthorId) -> Result<()> {
148        self.inner
149            .rpc(AuthorSetDefaultRequest { author_id })
150            .await??;
151        Ok(())
152    }
153
154    /// Lists document authors for which we have a secret key.
155    ///
156    /// It's only possible to create writes from authors that we have the secret key of.
157    pub async fn author_list(&self) -> Result<impl Stream<Item = Result<AuthorId>>> {
158        let stream = self.inner.server_streaming(AuthorListRequest, 64).await?;
159        Ok(stream.into_stream().map(|res| match res {
160            Err(err) => Err(err.into()),
161            Ok(Err(err)) => Err(err.into()),
162            Ok(Ok(res)) => Ok(res.author_id),
163        }))
164    }
165
166    /// Exports the given author.
167    ///
168    /// Warning: The [`Author`] struct contains sensitive data.
169    pub async fn author_export(&self, author: AuthorId) -> Result<Option<Author>> {
170        let response = self.inner.rpc(AuthorExportRequest { author }).await??;
171        Ok(response.author)
172    }
173
174    /// Imports the given author.
175    ///
176    /// Warning: The [`Author`] struct contains sensitive data.
177    pub async fn author_import(&self, author: Author) -> Result<()> {
178        self.inner.rpc(AuthorImportRequest { author }).await??;
179        Ok(())
180    }
181
182    /// Deletes the given author by id.
183    ///
184    /// Warning: This permanently removes this author.
185    ///
186    /// Returns an error if attempting to delete the default author.
187    pub async fn author_delete(&self, author: AuthorId) -> Result<()> {
188        self.inner.rpc(AuthorDeleteRequest { author }).await??;
189        Ok(())
190    }
191
192    /// Creates a new document.
193    pub async fn create(&self) -> Result<Doc> {
194        let response = self.inner.rpc(CreateRequest).await??;
195        Ok(Doc::new(self.inner.clone(), response.id))
196    }
197
198    /// Deletes a document from the local node.
199    ///
200    /// This is a destructive operation. Both the document secret key and all entries in the
201    /// document will be permanently deleted from the node's storage. Content blobs will be deleted
202    /// through garbage collection unless they are referenced from another document or tag.
203    pub async fn drop_doc(&self, doc_id: NamespaceId) -> Result<()> {
204        self.inner.rpc(DropRequest { doc_id }).await??;
205        Ok(())
206    }
207
208    /// Imports a document from a namespace capability.
209    ///
210    /// This does not start sync automatically. Use [`Doc::start_sync`] to start sync.
211    pub async fn import_namespace(&self, capability: Capability) -> Result<Doc> {
212        let response = self.inner.rpc(ImportRequest { capability }).await??;
213        Ok(Doc::new(self.inner.clone(), response.doc_id))
214    }
215
216    /// Imports a document from a ticket and joins all peers in the ticket.
217    pub async fn import(&self, ticket: DocTicket) -> Result<Doc> {
218        let DocTicket { capability, nodes } = ticket;
219        let doc = self.import_namespace(capability).await?;
220        doc.start_sync(nodes).await?;
221        Ok(doc)
222    }
223
224    /// Imports a document from a ticket, creates a subscription stream and joins all peers in the ticket.
225    ///
226    /// Returns the [`Doc`] and a [`Stream`] of [`LiveEvent`]s.
227    ///
228    /// The subscription stream is created before the sync is started, so the first call to this
229    /// method after starting the node is guaranteed to not miss any sync events.
230    pub async fn import_and_subscribe(
231        &self,
232        ticket: DocTicket,
233    ) -> Result<(Doc, impl Stream<Item = Result<LiveEvent>>)> {
234        let DocTicket { capability, nodes } = ticket;
235        let response = self.inner.rpc(ImportRequest { capability }).await??;
236        let doc = Doc::new(self.inner.clone(), response.doc_id);
237        let events = doc.subscribe().await?;
238        doc.start_sync(nodes).await?;
239        Ok((doc, events))
240    }
241
242    /// Lists all documents.
243    pub async fn list(
244        &self,
245    ) -> Result<impl Stream<Item = Result<(NamespaceId, CapabilityKind)>> + Unpin + Send + 'static>
246    {
247        let stream = self.inner.server_streaming(ListRequest, 64).await?;
248        let stream = Box::pin(stream.into_stream());
249        Ok(stream.map(|res| match res {
250            Err(err) => Err(err.into()),
251            Ok(Err(err)) => Err(err.into()),
252            Ok(Ok(res)) => Ok((res.id, res.capability)),
253        }))
254    }
255
256    /// Returns a [`Doc`] client for a single document.
257    ///
258    /// Returns None if the document cannot be found.
259    pub async fn open(&self, id: NamespaceId) -> Result<Option<Doc>> {
260        self.inner.rpc(OpenRequest { doc_id: id }).await??;
261        Ok(Some(Doc::new(self.inner.clone(), id)))
262    }
263}
264
265/// Document handle
266#[derive(Debug, Clone)]
267pub struct Doc {
268    inner: Client,
269    namespace_id: NamespaceId,
270    closed: Arc<AtomicBool>,
271}
272
273impl Doc {
274    fn new(inner: Client, namespace_id: NamespaceId) -> Self {
275        Self {
276            inner,
277            namespace_id,
278            closed: Default::default(),
279        }
280    }
281
282    /// Returns the document id of this doc.
283    pub fn id(&self) -> NamespaceId {
284        self.namespace_id
285    }
286
287    /// Closes the document.
288    pub async fn close(&self) -> Result<()> {
289        self.closed.store(true, Ordering::Relaxed);
290        self.inner
291            .rpc(CloseRequest {
292                doc_id: self.namespace_id,
293            })
294            .await??;
295        Ok(())
296    }
297
298    fn ensure_open(&self) -> Result<()> {
299        if self.closed.load(Ordering::Relaxed) {
300            Err(anyhow::anyhow!("document is closed"))
301        } else {
302            Ok(())
303        }
304    }
305
306    /// Sets the content of a key to a byte array.
307    pub async fn set_bytes(
308        &self,
309        author_id: AuthorId,
310        key: impl Into<Bytes>,
311        value: impl Into<Bytes>,
312    ) -> Result<Hash> {
313        self.ensure_open()?;
314        let response = self
315            .inner
316            .rpc(SetRequest {
317                doc_id: self.namespace_id,
318                author_id,
319                key: key.into(),
320                value: value.into(),
321            })
322            .await??;
323        Ok(response.entry.content_hash())
324    }
325
326    /// Sets an entry on the doc via its key, hash, and size.
327    pub async fn set_hash(
328        &self,
329        author_id: AuthorId,
330        key: impl Into<Bytes>,
331        hash: Hash,
332        size: u64,
333    ) -> Result<()> {
334        self.ensure_open()?;
335        self.inner
336            .rpc(SetHashRequest {
337                doc_id: self.namespace_id,
338                author_id,
339                key: key.into(),
340                hash,
341                size,
342            })
343            .await??;
344        Ok(())
345    }
346
347    /// Deletes entries that match the given `author` and key `prefix`.
348    ///
349    /// This inserts an empty entry with the key set to `prefix`, effectively clearing all other
350    /// entries whose key starts with or is equal to the given `prefix`.
351    ///
352    /// Returns the number of entries deleted.
353    pub async fn del(&self, author_id: AuthorId, prefix: impl Into<Bytes>) -> Result<usize> {
354        self.ensure_open()?;
355        let response = self
356            .inner
357            .rpc(DelRequest {
358                doc_id: self.namespace_id,
359                author_id,
360                prefix: prefix.into(),
361            })
362            .await??;
363        Ok(response.removed)
364    }
365
366    /// Returns an entry for a key and author.
367    ///
368    /// Optionally also returns the entry unless it is empty (i.e. a deletion marker).
369    pub async fn get_exact(
370        &self,
371        author: AuthorId,
372        key: impl AsRef<[u8]>,
373        include_empty: bool,
374    ) -> Result<Option<Entry>> {
375        self.ensure_open()?;
376        let response = self
377            .inner
378            .rpc(GetExactRequest {
379                author,
380                key: key.as_ref().to_vec().into(),
381                doc_id: self.namespace_id,
382                include_empty,
383            })
384            .await??;
385        Ok(response.entry.map(|entry| entry.into()))
386    }
387
388    /// Returns all entries matching the query.
389    pub async fn get_many(
390        &self,
391        query: impl Into<Query>,
392    ) -> Result<impl Stream<Item = Result<Entry>>> {
393        self.ensure_open()?;
394        let stream = self
395            .inner
396            .server_streaming(
397                GetManyRequest {
398                    doc_id: self.namespace_id,
399                    query: query.into(),
400                },
401                64,
402            )
403            .await?;
404        Ok(stream.into_stream().map(|res| match res {
405            Err(err) => Err(err.into()),
406            Ok(Err(err)) => Err(err.into()),
407            Ok(Ok(res)) => Ok(res.into()),
408        }))
409    }
410
411    /// Returns a single entry.
412    pub async fn get_one(&self, query: impl Into<Query>) -> Result<Option<Entry>> {
413        self.ensure_open()?;
414        let stream = self.get_many(query).await?;
415        tokio::pin!(stream);
416        futures_lite::StreamExt::next(&mut stream).await.transpose()
417    }
418
419    /// Shares this document with peers over a ticket.
420    pub async fn share(&self, mode: ShareMode, addr_options: AddrInfoOptions) -> Result<DocTicket> {
421        self.ensure_open()?;
422        let response = self
423            .inner
424            .rpc(ShareRequest {
425                doc_id: self.namespace_id,
426                mode,
427                addr_options,
428            })
429            .await??;
430        Ok(response.0)
431    }
432
433    /// Starts to sync this document with a list of peers.
434    pub async fn start_sync(&self, peers: Vec<NodeAddr>) -> Result<()> {
435        self.ensure_open()?;
436        self.inner
437            .rpc(StartSyncRequest {
438                doc_id: self.namespace_id,
439                peers,
440            })
441            .await??;
442        Ok(())
443    }
444
445    /// Stops the live sync for this document.
446    pub async fn leave(&self) -> Result<()> {
447        self.ensure_open()?;
448        self.inner
449            .rpc(LeaveRequest {
450                doc_id: self.namespace_id,
451            })
452            .await??;
453        Ok(())
454    }
455
456    /// Subscribes to events for this document.
457    pub async fn subscribe(
458        &self,
459    ) -> Result<impl Stream<Item = Result<LiveEvent>> + Send + Unpin + 'static> {
460        self.ensure_open()?;
461        let stream = self
462            .inner
463            .server_streaming(
464                SubscribeRequest {
465                    doc_id: self.namespace_id,
466                },
467                64,
468            )
469            .await?;
470        Ok(Box::pin(stream.into_stream().map(|res| match res {
471            Err(err) => Err(err.into()),
472            Ok(Err(err)) => Err(err.into()),
473            Ok(Ok(res)) => Ok(res.event),
474        })))
475    }
476
477    /// Returns status info for this document
478    pub async fn status(&self) -> Result<OpenState> {
479        self.ensure_open()?;
480        let response = self
481            .inner
482            .rpc(StatusRequest {
483                doc_id: self.namespace_id,
484            })
485            .await??;
486        Ok(response.status)
487    }
488
489    /// Sets the download policy for this document
490    pub async fn set_download_policy(&self, policy: DownloadPolicy) -> Result<()> {
491        self.ensure_open()?;
492        self.inner
493            .rpc(SetDownloadPolicyRequest {
494                doc_id: self.namespace_id,
495                policy,
496            })
497            .await??;
498        Ok(())
499    }
500
501    /// Returns the download policy for this document
502    pub async fn get_download_policy(&self) -> Result<DownloadPolicy> {
503        self.ensure_open()?;
504        let response = self
505            .inner
506            .rpc(GetDownloadPolicyRequest {
507                doc_id: self.namespace_id,
508            })
509            .await??;
510        Ok(response.policy)
511    }
512
513    /// Returns sync peers for this document
514    pub async fn get_sync_peers(&self) -> Result<Option<Vec<PeerIdBytes>>> {
515        self.ensure_open()?;
516        let response = self
517            .inner
518            .rpc(GetSyncPeersRequest {
519                doc_id: self.namespace_id,
520            })
521            .await??;
522        Ok(response.peers)
523    }
524
525    /// Adds an entry from an absolute file path
526    pub async fn import_file(
527        &self,
528        blobs: &iroh_blobs::api::Store,
529        author: AuthorId,
530        key: Bytes,
531        path: impl AsRef<Path>,
532        import_mode: iroh_blobs::api::blobs::ImportMode,
533    ) -> Result<ImportFileProgress> {
534        self.ensure_open()?;
535        let progress = blobs.add_path_with_opts(AddPathOptions {
536            path: path.as_ref().to_owned(),
537            format: iroh_blobs::BlobFormat::Raw,
538            mode: import_mode,
539        });
540        let stream = progress.stream().await;
541        let doc = self.clone();
542        let ctx = EntryContext {
543            doc,
544            author,
545            key,
546            size: None,
547        };
548        Ok(ImportFileProgress(ImportInner::Blobs(
549            Box::pin(stream),
550            Some(ctx),
551        )))
552    }
553
554    /// Exports an entry as a file to a given absolute path.
555    pub async fn export_file(
556        &self,
557        blobs: &iroh_blobs::api::Store,
558        entry: Entry,
559        path: impl AsRef<Path>,
560        mode: ExportMode,
561    ) -> Result<ExportProgress> {
562        self.ensure_open()?;
563        let hash = entry.content_hash();
564        let progress = blobs.export_with_opts(ExportOptions {
565            hash,
566            mode,
567            target: path.as_ref().to_path_buf(),
568        });
569        Ok(progress)
570    }
571}
572
573#[derive(Debug)]
574pub enum ImportFileProgressItem {
575    Error(anyhow::Error),
576    Blobs(AddProgressItem),
577    Done(ImportFileOutcome),
578}
579
580#[derive(Debug)]
581pub struct ImportFileProgress(ImportInner);
582
583#[derive(derive_more::Debug)]
584enum ImportInner {
585    #[debug("Blobs")]
586    Blobs(
587        n0_future::boxed::BoxStream<AddProgressItem>,
588        Option<EntryContext>,
589    ),
590    #[debug("Entry")]
591    Entry(n0_future::boxed::BoxFuture<Result<ImportFileOutcome>>),
592    Done,
593}
594
595struct EntryContext {
596    doc: Doc,
597    author: AuthorId,
598    key: Bytes,
599    size: Option<u64>,
600}
601
602impl Stream for ImportFileProgress {
603    type Item = ImportFileProgressItem;
604
605    fn poll_next(
606        self: Pin<&mut Self>,
607        cx: &mut std::task::Context<'_>,
608    ) -> Poll<Option<Self::Item>> {
609        let this = self.get_mut();
610        match this.0 {
611            ImportInner::Blobs(ref mut progress, ref mut context) => {
612                match ready!(progress.poll_next(cx)) {
613                    Some(item) => match item {
614                        AddProgressItem::Size(size) => {
615                            context
616                                .as_mut()
617                                .expect("Size must be emitted before done")
618                                .size = Some(size);
619                            Poll::Ready(Some(ImportFileProgressItem::Blobs(AddProgressItem::Size(
620                                size,
621                            ))))
622                        }
623                        AddProgressItem::Error(err) => {
624                            *this = Self(ImportInner::Done);
625                            Poll::Ready(Some(ImportFileProgressItem::Error(err.into())))
626                        }
627                        AddProgressItem::Done(tag) => {
628                            let EntryContext {
629                                doc,
630                                author,
631                                key,
632                                size,
633                            } = context
634                                .take()
635                                .expect("AddProgressItem::Done may be emitted only once");
636                            let size = size.expect("Size must be emitted before done");
637                            let hash = *tag.hash();
638                            *this = Self(ImportInner::Entry(Box::pin(async move {
639                                doc.set_hash(author, key.clone(), hash, size).await?;
640                                Ok(ImportFileOutcome { hash, size, key })
641                            })));
642                            Poll::Ready(Some(ImportFileProgressItem::Blobs(AddProgressItem::Done(
643                                tag,
644                            ))))
645                        }
646                        item => Poll::Ready(Some(ImportFileProgressItem::Blobs(item))),
647                    },
648                    None => todo!(),
649                }
650            }
651            ImportInner::Entry(ref mut fut) => {
652                let res = ready!(fut.poll(cx));
653                *this = Self(ImportInner::Done);
654                match res {
655                    Ok(outcome) => Poll::Ready(Some(ImportFileProgressItem::Done(outcome))),
656                    Err(err) => Poll::Ready(Some(ImportFileProgressItem::Error(err))),
657                }
658            }
659            ImportInner::Done => Poll::Ready(None),
660        }
661    }
662}
663
664impl Future for ImportFileProgress {
665    type Output = Result<ImportFileOutcome>;
666    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
667        loop {
668            match self.as_mut().poll_next(cx) {
669                Poll::Ready(Some(item)) => match item {
670                    ImportFileProgressItem::Error(error) => return Poll::Ready(Err(error)),
671                    ImportFileProgressItem::Blobs(_add_progress_item) => continue,
672                    ImportFileProgressItem::Done(outcome) => return Poll::Ready(Ok(outcome)),
673                },
674                Poll::Ready(None) => {
675                    return Poll::Ready(Err(anyhow::anyhow!(
676                        "ImportFileProgress polled after completion"
677                    )))
678                }
679                Poll::Pending => return Poll::Pending,
680            }
681        }
682    }
683}
684
685/// Outcome of a [`Doc::import_file`] operation
686#[derive(Debug, Clone, PartialEq, Eq)]
687pub struct ImportFileOutcome {
688    /// The hash of the entry's content
689    pub hash: Hash,
690    /// The size of the entry
691    pub size: u64,
692    /// The key of the entry
693    pub key: Bytes,
694}