iroh_blobs/
net_protocol.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
//! Adaptation of `iroh-blobs` as an [`iroh`] [`ProtocolHandler`].
//!
//! This is the easiest way to share data from a [`crate::api::Store`] over iroh connections.
//!
//! # Example
//!
//! ```rust
//! # async fn example() -> anyhow::Result<()> {
//! use iroh::{protocol::Router, Endpoint};
//! use iroh_blobs::{net_protocol::Blobs, store};
//!
//! // create a store
//! let store = store::fs::FsStore::load("blobs").await?;
//!
//! // add some data
//! let t = store.add_slice(b"hello world").await?;
//!
//! // create an iroh endpoint
//! let endpoint = Endpoint::builder().discovery_n0().bind().await?;
//!
//! // create a blobs protocol handler
//! let blobs = Blobs::new(&store, endpoint.clone(), None);
//!
//! // create a router and add the blobs protocol handler
//! let router = Router::builder(endpoint)
//!     .accept(iroh_blobs::ALPN, blobs.clone())
//!     .spawn();
//!
//! // this data is now globally available using the ticket
//! let ticket = blobs.ticket(t).await?;
//! println!("ticket: {}", ticket);
//!
//! // wait for control-c to exit
//! tokio::signal::ctrl_c().await?;
//! #   Ok(())
//! # }
//! ```

use std::{fmt::Debug, future::Future, sync::Arc};

use iroh::{
    endpoint::Connection,
    protocol::{AcceptError, ProtocolHandler},
    Endpoint, Watcher,
};
use tokio::sync::mpsc;
use tracing::error;

use crate::{
    api::Store,
    provider::{Event, EventSender},
    ticket::BlobTicket,
    HashAndFormat,
};

#[derive(Debug)]
pub(crate) struct BlobsInner {
    pub(crate) store: Store,
    pub(crate) endpoint: Endpoint,
    pub(crate) events: EventSender,
}

/// A protocol handler for the blobs protocol.
#[derive(Debug, Clone)]
pub struct Blobs {
    pub(crate) inner: Arc<BlobsInner>,
}

impl Blobs {
    pub fn new(store: &Store, endpoint: Endpoint, events: Option<mpsc::Sender<Event>>) -> Self {
        Self {
            inner: Arc::new(BlobsInner {
                store: store.clone(),
                endpoint,
                events: EventSender::new(events),
            }),
        }
    }

    pub fn store(&self) -> &Store {
        &self.inner.store
    }

    pub fn endpoint(&self) -> &Endpoint {
        &self.inner.endpoint
    }

    /// Create a ticket for content on this node.
    ///
    /// Note that this does not check whether the content is partially or fully available. It is
    /// just a convenience method to create a ticket from content and the address of this node.
    pub async fn ticket(&self, content: impl Into<HashAndFormat>) -> anyhow::Result<BlobTicket> {
        let content = content.into();
        let addr = self.inner.endpoint.node_addr().initialized().await?;
        let ticket = BlobTicket::new(addr, content.hash, content.format);
        Ok(ticket)
    }
}

impl ProtocolHandler for Blobs {
    fn accept(
        &self,
        conn: Connection,
    ) -> impl Future<Output = std::result::Result<(), AcceptError>> + Send {
        let store = self.store().clone();
        let events = self.inner.events.clone();

        Box::pin(async move {
            crate::provider::handle_connection(conn, store, events).await;
            Ok(())
        })
    }

    fn shutdown(&self) -> impl Future<Output = ()> + Send {
        let store = self.store().clone();
        Box::pin(async move {
            if let Err(cause) = store.shutdown().await {
                error!("error shutting down store: {:?}", cause);
            }
        })
    }
}