iroh_docs/
api.rs

1//! irpc-based RPC implementation for docs.
2
3#![allow(missing_docs)]
4
5use std::{
6    future::Future,
7    path::Path,
8    pin::Pin,
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc,
12    },
13    task::{ready, Poll},
14};
15
16use anyhow::Result;
17use bytes::Bytes;
18use iroh::EndpointAddr;
19use iroh_blobs::{
20    api::blobs::{AddPathOptions, AddProgressItem, ExportMode, ExportOptions, ExportProgress},
21    Hash,
22};
23use n0_future::{FutureExt, Stream, StreamExt};
24
25use self::{
26    actor::RpcActor,
27    protocol::{
28        AddrInfoOptions, AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest,
29        AuthorGetDefaultRequest, AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest,
30        CloseRequest, CreateRequest, DelRequest, DocsProtocol, DropRequest,
31        GetDownloadPolicyRequest, GetExactRequest, GetManyRequest, GetSyncPeersRequest,
32        ImportRequest, LeaveRequest, ListRequest, OpenRequest, SetDownloadPolicyRequest,
33        SetHashRequest, SetRequest, ShareMode, ShareRequest, StartSyncRequest, StatusRequest,
34        SubscribeRequest,
35    },
36};
37use crate::{
38    actor::OpenState,
39    engine::{Engine, LiveEvent},
40    store::{DownloadPolicy, Query},
41    Author, AuthorId, Capability, CapabilityKind, DocTicket, Entry, NamespaceId, PeerIdBytes,
42};
43
44pub(crate) mod actor;
45pub mod protocol;
46
47pub type RpcError = serde_error::Error;
48pub type RpcResult<T> = std::result::Result<T, RpcError>;
49
50type Client = irpc::Client<DocsProtocol>;
51
52/// API wrapper for the docs service
53#[derive(Debug, Clone)]
54pub struct DocsApi {
55    pub(crate) inner: Client,
56}
57
58impl DocsApi {
59    /// Create a new docs API from an engine
60    pub fn spawn(engine: Arc<Engine>) -> Self {
61        RpcActor::spawn(engine)
62    }
63
64    /// Connect to a remote docs service
65    #[cfg(feature = "rpc")]
66    pub fn connect(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Result<DocsApi> {
67        Ok(DocsApi {
68            inner: Client::quinn(endpoint, addr),
69        })
70    }
71
72    /// Listen for incoming RPC connections
73    #[cfg(feature = "rpc")]
74    pub fn listen(
75        &self,
76        endpoint: quinn::Endpoint,
77    ) -> Result<n0_future::task::AbortOnDropHandle<()>> {
78        use anyhow::Context;
79        let local = self
80            .inner
81            .as_local()
82            .context("cannot listen on remote API")?;
83        let handler: irpc::rpc::Handler<DocsProtocol> = Arc::new(move |msg, _rx, tx| {
84            let local = local.clone();
85            Box::pin(async move {
86                match msg {
87                    DocsProtocol::Open(msg) => local.send((msg, tx)).await,
88                    DocsProtocol::Close(msg) => local.send((msg, tx)).await,
89                    DocsProtocol::Status(msg) => local.send((msg, tx)).await,
90                    DocsProtocol::List(msg) => local.send((msg, tx)).await,
91                    DocsProtocol::Create(msg) => local.send((msg, tx)).await,
92                    DocsProtocol::Drop(msg) => local.send((msg, tx)).await,
93                    DocsProtocol::Import(msg) => local.send((msg, tx)).await,
94                    DocsProtocol::Set(msg) => local.send((msg, tx)).await,
95                    DocsProtocol::SetHash(msg) => local.send((msg, tx)).await,
96                    DocsProtocol::Get(msg) => local.send((msg, tx)).await,
97                    DocsProtocol::GetExact(msg) => local.send((msg, tx)).await,
98                    // DocsProtocol::ImportFile(msg) => local.send((msg, tx)).await,
99                    // DocsProtocol::ExportFile(msg) => local.send((msg, tx)).await,
100                    DocsProtocol::Del(msg) => local.send((msg, tx)).await,
101                    DocsProtocol::StartSync(msg) => local.send((msg, tx)).await,
102                    DocsProtocol::Leave(msg) => local.send((msg, tx)).await,
103                    DocsProtocol::Share(msg) => local.send((msg, tx)).await,
104                    DocsProtocol::Subscribe(msg) => local.send((msg, tx)).await,
105                    DocsProtocol::GetDownloadPolicy(msg) => local.send((msg, tx)).await,
106                    DocsProtocol::SetDownloadPolicy(msg) => local.send((msg, tx)).await,
107                    DocsProtocol::GetSyncPeers(msg) => local.send((msg, tx)).await,
108                    DocsProtocol::AuthorList(msg) => local.send((msg, tx)).await,
109                    DocsProtocol::AuthorCreate(msg) => local.send((msg, tx)).await,
110                    DocsProtocol::AuthorGetDefault(msg) => local.send((msg, tx)).await,
111                    DocsProtocol::AuthorSetDefault(msg) => local.send((msg, tx)).await,
112                    DocsProtocol::AuthorImport(msg) => local.send((msg, tx)).await,
113                    DocsProtocol::AuthorExport(msg) => local.send((msg, tx)).await,
114                    DocsProtocol::AuthorDelete(msg) => local.send((msg, tx)).await,
115                }
116            })
117        });
118        let join_handle = n0_future::task::spawn(irpc::rpc::listen(endpoint, handler));
119        Ok(n0_future::task::AbortOnDropHandle::new(join_handle))
120    }
121
122    /// Creates a new document author.
123    ///
124    /// You likely want to save the returned [`AuthorId`] somewhere so that you can use this author
125    /// again.
126    ///
127    /// If you need only a single author, use [`Self::author_default`].
128    pub async fn author_create(&self) -> Result<AuthorId> {
129        let response = self.inner.rpc(AuthorCreateRequest).await??;
130        Ok(response.author_id)
131    }
132
133    /// Returns the default document author of this node.
134    ///
135    /// On persistent nodes, the author is created on first start and its public key is saved
136    /// in the data directory.
137    ///
138    /// The default author can be set with [`Self::author_set_default`].
139    pub async fn author_default(&self) -> Result<AuthorId> {
140        let response = self.inner.rpc(AuthorGetDefaultRequest).await??;
141        Ok(response.author_id)
142    }
143
144    /// Sets the node-wide default author.
145    ///
146    /// If the author does not exist, an error is returned.
147    ///
148    /// On a persistent node, the author id will be saved to a file in the data directory and
149    /// reloaded after a restart.
150    pub async fn author_set_default(&self, author_id: AuthorId) -> Result<()> {
151        self.inner
152            .rpc(AuthorSetDefaultRequest { author_id })
153            .await??;
154        Ok(())
155    }
156
157    /// Lists document authors for which we have a secret key.
158    ///
159    /// It's only possible to create writes from authors that we have the secret key of.
160    pub async fn author_list(&self) -> Result<impl Stream<Item = Result<AuthorId>>> {
161        let stream = self.inner.server_streaming(AuthorListRequest, 64).await?;
162        Ok(stream.into_stream().map(|res| match res {
163            Err(err) => Err(err.into()),
164            Ok(Err(err)) => Err(err.into()),
165            Ok(Ok(res)) => Ok(res.author_id),
166        }))
167    }
168
169    /// Exports the given author.
170    ///
171    /// Warning: The [`Author`] struct contains sensitive data.
172    pub async fn author_export(&self, author: AuthorId) -> Result<Option<Author>> {
173        let response = self.inner.rpc(AuthorExportRequest { author }).await??;
174        Ok(response.author)
175    }
176
177    /// Imports the given author.
178    ///
179    /// Warning: The [`Author`] struct contains sensitive data.
180    pub async fn author_import(&self, author: Author) -> Result<()> {
181        self.inner.rpc(AuthorImportRequest { author }).await??;
182        Ok(())
183    }
184
185    /// Deletes the given author by id.
186    ///
187    /// Warning: This permanently removes this author.
188    ///
189    /// Returns an error if attempting to delete the default author.
190    pub async fn author_delete(&self, author: AuthorId) -> Result<()> {
191        self.inner.rpc(AuthorDeleteRequest { author }).await??;
192        Ok(())
193    }
194
195    /// Creates a new document.
196    pub async fn create(&self) -> Result<Doc> {
197        let response = self.inner.rpc(CreateRequest).await??;
198        Ok(Doc::new(self.inner.clone(), response.id))
199    }
200
201    /// Deletes a document from the local node.
202    ///
203    /// This is a destructive operation. Both the document secret key and all entries in the
204    /// document will be permanently deleted from the node's storage. Content blobs will be deleted
205    /// through garbage collection unless they are referenced from another document or tag.
206    pub async fn drop_doc(&self, doc_id: NamespaceId) -> Result<()> {
207        self.inner.rpc(DropRequest { doc_id }).await??;
208        Ok(())
209    }
210
211    /// Imports a document from a namespace capability.
212    ///
213    /// This does not start sync automatically. Use [`Doc::start_sync`] to start sync.
214    pub async fn import_namespace(&self, capability: Capability) -> Result<Doc> {
215        let response = self.inner.rpc(ImportRequest { capability }).await??;
216        Ok(Doc::new(self.inner.clone(), response.doc_id))
217    }
218
219    /// Imports a document from a ticket and joins all peers in the ticket.
220    pub async fn import(&self, ticket: DocTicket) -> Result<Doc> {
221        let DocTicket { capability, nodes } = ticket;
222        let doc = self.import_namespace(capability).await?;
223        doc.start_sync(nodes).await?;
224        Ok(doc)
225    }
226
227    /// Imports a document from a ticket, creates a subscription stream and joins all peers in the ticket.
228    ///
229    /// Returns the [`Doc`] and a [`Stream`] of [`LiveEvent`]s.
230    ///
231    /// The subscription stream is created before the sync is started, so the first call to this
232    /// method after starting the node is guaranteed to not miss any sync events.
233    pub async fn import_and_subscribe(
234        &self,
235        ticket: DocTicket,
236    ) -> Result<(Doc, impl Stream<Item = Result<LiveEvent>>)> {
237        let DocTicket { capability, nodes } = ticket;
238        let response = self.inner.rpc(ImportRequest { capability }).await??;
239        let doc = Doc::new(self.inner.clone(), response.doc_id);
240        let events = doc.subscribe().await?;
241        doc.start_sync(nodes).await?;
242        Ok((doc, events))
243    }
244
245    /// Lists all documents.
246    pub async fn list(
247        &self,
248    ) -> Result<impl Stream<Item = Result<(NamespaceId, CapabilityKind)>> + Unpin + Send + 'static>
249    {
250        let stream = self.inner.server_streaming(ListRequest, 64).await?;
251        let stream = Box::pin(stream.into_stream());
252        Ok(stream.map(|res| match res {
253            Err(err) => Err(err.into()),
254            Ok(Err(err)) => Err(err.into()),
255            Ok(Ok(res)) => Ok((res.id, res.capability)),
256        }))
257    }
258
259    /// Returns a [`Doc`] client for a single document.
260    ///
261    /// Returns None if the document cannot be found.
262    pub async fn open(&self, id: NamespaceId) -> Result<Option<Doc>> {
263        self.inner.rpc(OpenRequest { doc_id: id }).await??;
264        Ok(Some(Doc::new(self.inner.clone(), id)))
265    }
266}
267
268/// Document handle
269#[derive(Debug, Clone)]
270pub struct Doc {
271    inner: Client,
272    namespace_id: NamespaceId,
273    closed: Arc<AtomicBool>,
274}
275
276impl Doc {
277    fn new(inner: Client, namespace_id: NamespaceId) -> Self {
278        Self {
279            inner,
280            namespace_id,
281            closed: Default::default(),
282        }
283    }
284
285    /// Returns the document id of this doc.
286    pub fn id(&self) -> NamespaceId {
287        self.namespace_id
288    }
289
290    /// Closes the document.
291    pub async fn close(&self) -> Result<()> {
292        self.closed.store(true, Ordering::Relaxed);
293        self.inner
294            .rpc(CloseRequest {
295                doc_id: self.namespace_id,
296            })
297            .await??;
298        Ok(())
299    }
300
301    fn ensure_open(&self) -> Result<()> {
302        if self.closed.load(Ordering::Relaxed) {
303            Err(anyhow::anyhow!("document is closed"))
304        } else {
305            Ok(())
306        }
307    }
308
309    /// Sets the content of a key to a byte array.
310    pub async fn set_bytes(
311        &self,
312        author_id: AuthorId,
313        key: impl Into<Bytes>,
314        value: impl Into<Bytes>,
315    ) -> Result<Hash> {
316        self.ensure_open()?;
317        let response = self
318            .inner
319            .rpc(SetRequest {
320                doc_id: self.namespace_id,
321                author_id,
322                key: key.into(),
323                value: value.into(),
324            })
325            .await??;
326        Ok(response.entry.content_hash())
327    }
328
329    /// Sets an entry on the doc via its key, hash, and size.
330    pub async fn set_hash(
331        &self,
332        author_id: AuthorId,
333        key: impl Into<Bytes>,
334        hash: Hash,
335        size: u64,
336    ) -> Result<()> {
337        self.ensure_open()?;
338        self.inner
339            .rpc(SetHashRequest {
340                doc_id: self.namespace_id,
341                author_id,
342                key: key.into(),
343                hash,
344                size,
345            })
346            .await??;
347        Ok(())
348    }
349
350    /// Deletes entries that match the given `author` and key `prefix`.
351    ///
352    /// This inserts an empty entry with the key set to `prefix`, effectively clearing all other
353    /// entries whose key starts with or is equal to the given `prefix`.
354    ///
355    /// Returns the number of entries deleted.
356    pub async fn del(&self, author_id: AuthorId, prefix: impl Into<Bytes>) -> Result<usize> {
357        self.ensure_open()?;
358        let response = self
359            .inner
360            .rpc(DelRequest {
361                doc_id: self.namespace_id,
362                author_id,
363                prefix: prefix.into(),
364            })
365            .await??;
366        Ok(response.removed)
367    }
368
369    /// Returns an entry for a key and author.
370    ///
371    /// Optionally also returns the entry unless it is empty (i.e. a deletion marker).
372    pub async fn get_exact(
373        &self,
374        author: AuthorId,
375        key: impl AsRef<[u8]>,
376        include_empty: bool,
377    ) -> Result<Option<Entry>> {
378        self.ensure_open()?;
379        let response = self
380            .inner
381            .rpc(GetExactRequest {
382                author,
383                key: key.as_ref().to_vec().into(),
384                doc_id: self.namespace_id,
385                include_empty,
386            })
387            .await??;
388        Ok(response.entry.map(|entry| entry.into()))
389    }
390
391    /// Returns all entries matching the query.
392    pub async fn get_many(
393        &self,
394        query: impl Into<Query>,
395    ) -> Result<impl Stream<Item = Result<Entry>>> {
396        self.ensure_open()?;
397        let stream = self
398            .inner
399            .server_streaming(
400                GetManyRequest {
401                    doc_id: self.namespace_id,
402                    query: query.into(),
403                },
404                64,
405            )
406            .await?;
407        Ok(stream.into_stream().map(|res| match res {
408            Err(err) => Err(err.into()),
409            Ok(Err(err)) => Err(err.into()),
410            Ok(Ok(res)) => Ok(res.into()),
411        }))
412    }
413
414    /// Returns a single entry.
415    pub async fn get_one(&self, query: impl Into<Query>) -> Result<Option<Entry>> {
416        self.ensure_open()?;
417        let stream = self.get_many(query).await?;
418        tokio::pin!(stream);
419        futures_lite::StreamExt::next(&mut stream).await.transpose()
420    }
421
422    /// Shares this document with peers over a ticket.
423    pub async fn share(&self, mode: ShareMode, addr_options: AddrInfoOptions) -> Result<DocTicket> {
424        self.ensure_open()?;
425        let response = self
426            .inner
427            .rpc(ShareRequest {
428                doc_id: self.namespace_id,
429                mode,
430                addr_options,
431            })
432            .await??;
433        Ok(response.0)
434    }
435
436    /// Starts to sync this document with a list of peers.
437    pub async fn start_sync(&self, peers: Vec<EndpointAddr>) -> Result<()> {
438        self.ensure_open()?;
439        self.inner
440            .rpc(StartSyncRequest {
441                doc_id: self.namespace_id,
442                peers,
443            })
444            .await??;
445        Ok(())
446    }
447
448    /// Stops the live sync for this document.
449    pub async fn leave(&self) -> Result<()> {
450        self.ensure_open()?;
451        self.inner
452            .rpc(LeaveRequest {
453                doc_id: self.namespace_id,
454            })
455            .await??;
456        Ok(())
457    }
458
459    /// Subscribes to events for this document.
460    pub async fn subscribe(
461        &self,
462    ) -> Result<impl Stream<Item = Result<LiveEvent>> + Send + Unpin + 'static> {
463        self.ensure_open()?;
464        let stream = self
465            .inner
466            .server_streaming(
467                SubscribeRequest {
468                    doc_id: self.namespace_id,
469                },
470                64,
471            )
472            .await?;
473        Ok(Box::pin(stream.into_stream().map(|res| match res {
474            Err(err) => Err(err.into()),
475            Ok(Err(err)) => Err(err.into()),
476            Ok(Ok(res)) => Ok(res.event),
477        })))
478    }
479
480    /// Returns status info for this document
481    pub async fn status(&self) -> Result<OpenState> {
482        self.ensure_open()?;
483        let response = self
484            .inner
485            .rpc(StatusRequest {
486                doc_id: self.namespace_id,
487            })
488            .await??;
489        Ok(response.status)
490    }
491
492    /// Sets the download policy for this document
493    pub async fn set_download_policy(&self, policy: DownloadPolicy) -> Result<()> {
494        self.ensure_open()?;
495        self.inner
496            .rpc(SetDownloadPolicyRequest {
497                doc_id: self.namespace_id,
498                policy,
499            })
500            .await??;
501        Ok(())
502    }
503
504    /// Returns the download policy for this document
505    pub async fn get_download_policy(&self) -> Result<DownloadPolicy> {
506        self.ensure_open()?;
507        let response = self
508            .inner
509            .rpc(GetDownloadPolicyRequest {
510                doc_id: self.namespace_id,
511            })
512            .await??;
513        Ok(response.policy)
514    }
515
516    /// Returns sync peers for this document
517    pub async fn get_sync_peers(&self) -> Result<Option<Vec<PeerIdBytes>>> {
518        self.ensure_open()?;
519        let response = self
520            .inner
521            .rpc(GetSyncPeersRequest {
522                doc_id: self.namespace_id,
523            })
524            .await??;
525        Ok(response.peers)
526    }
527
528    /// Adds an entry from an absolute file path
529    pub async fn import_file(
530        &self,
531        blobs: &iroh_blobs::api::Store,
532        author: AuthorId,
533        key: Bytes,
534        path: impl AsRef<Path>,
535        import_mode: iroh_blobs::api::blobs::ImportMode,
536    ) -> Result<ImportFileProgress> {
537        self.ensure_open()?;
538        let progress = blobs.add_path_with_opts(AddPathOptions {
539            path: path.as_ref().to_owned(),
540            format: iroh_blobs::BlobFormat::Raw,
541            mode: import_mode,
542        });
543        let stream = progress.stream().await;
544        let doc = self.clone();
545        let ctx = EntryContext {
546            doc,
547            author,
548            key,
549            size: None,
550        };
551        Ok(ImportFileProgress(ImportInner::Blobs(
552            Box::pin(stream),
553            Some(ctx),
554        )))
555    }
556
557    /// Exports an entry as a file to a given absolute path.
558    pub async fn export_file(
559        &self,
560        blobs: &iroh_blobs::api::Store,
561        entry: Entry,
562        path: impl AsRef<Path>,
563        mode: ExportMode,
564    ) -> Result<ExportProgress> {
565        self.ensure_open()?;
566        let hash = entry.content_hash();
567        let progress = blobs.export_with_opts(ExportOptions {
568            hash,
569            mode,
570            target: path.as_ref().to_path_buf(),
571        });
572        Ok(progress)
573    }
574}
575
576#[derive(Debug)]
577pub enum ImportFileProgressItem {
578    Error(anyhow::Error),
579    Blobs(AddProgressItem),
580    Done(ImportFileOutcome),
581}
582
583#[derive(Debug)]
584pub struct ImportFileProgress(ImportInner);
585
586#[derive(derive_more::Debug)]
587enum ImportInner {
588    #[debug("Blobs")]
589    Blobs(
590        n0_future::boxed::BoxStream<AddProgressItem>,
591        Option<EntryContext>,
592    ),
593    #[debug("Entry")]
594    Entry(n0_future::boxed::BoxFuture<Result<ImportFileOutcome>>),
595    Done,
596}
597
598struct EntryContext {
599    doc: Doc,
600    author: AuthorId,
601    key: Bytes,
602    size: Option<u64>,
603}
604
605impl Stream for ImportFileProgress {
606    type Item = ImportFileProgressItem;
607
608    fn poll_next(
609        self: Pin<&mut Self>,
610        cx: &mut std::task::Context<'_>,
611    ) -> Poll<Option<Self::Item>> {
612        let this = self.get_mut();
613        match this.0 {
614            ImportInner::Blobs(ref mut progress, ref mut context) => {
615                match ready!(progress.poll_next(cx)) {
616                    Some(item) => match item {
617                        AddProgressItem::Size(size) => {
618                            context
619                                .as_mut()
620                                .expect("Size must be emitted before done")
621                                .size = Some(size);
622                            Poll::Ready(Some(ImportFileProgressItem::Blobs(AddProgressItem::Size(
623                                size,
624                            ))))
625                        }
626                        AddProgressItem::Error(err) => {
627                            *this = Self(ImportInner::Done);
628                            Poll::Ready(Some(ImportFileProgressItem::Error(err.into())))
629                        }
630                        AddProgressItem::Done(tag) => {
631                            let EntryContext {
632                                doc,
633                                author,
634                                key,
635                                size,
636                            } = context
637                                .take()
638                                .expect("AddProgressItem::Done may be emitted only once");
639                            let size = size.expect("Size must be emitted before done");
640                            let hash = tag.hash();
641                            *this = Self(ImportInner::Entry(Box::pin(async move {
642                                doc.set_hash(author, key.clone(), hash, size).await?;
643                                Ok(ImportFileOutcome { hash, size, key })
644                            })));
645                            Poll::Ready(Some(ImportFileProgressItem::Blobs(AddProgressItem::Done(
646                                tag,
647                            ))))
648                        }
649                        item => Poll::Ready(Some(ImportFileProgressItem::Blobs(item))),
650                    },
651                    None => todo!(),
652                }
653            }
654            ImportInner::Entry(ref mut fut) => {
655                let res = ready!(fut.poll(cx));
656                *this = Self(ImportInner::Done);
657                match res {
658                    Ok(outcome) => Poll::Ready(Some(ImportFileProgressItem::Done(outcome))),
659                    Err(err) => Poll::Ready(Some(ImportFileProgressItem::Error(err))),
660                }
661            }
662            ImportInner::Done => Poll::Ready(None),
663        }
664    }
665}
666
667impl Future for ImportFileProgress {
668    type Output = Result<ImportFileOutcome>;
669    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
670        loop {
671            match self.as_mut().poll_next(cx) {
672                Poll::Ready(Some(item)) => match item {
673                    ImportFileProgressItem::Error(error) => return Poll::Ready(Err(error)),
674                    ImportFileProgressItem::Blobs(_add_progress_item) => continue,
675                    ImportFileProgressItem::Done(outcome) => return Poll::Ready(Ok(outcome)),
676                },
677                Poll::Ready(None) => {
678                    return Poll::Ready(Err(anyhow::anyhow!(
679                        "ImportFileProgress polled after completion"
680                    )))
681                }
682                Poll::Pending => return Poll::Pending,
683            }
684        }
685    }
686}
687
688/// Outcome of a [`Doc::import_file`] operation
689#[derive(Debug, Clone, PartialEq, Eq)]
690pub struct ImportFileOutcome {
691    /// The hash of the entry's content
692    pub hash: Hash,
693    /// The size of the entry
694    pub size: u64,
695    /// The key of the entry
696    pub key: Bytes,
697}