wasmcloud_host/wasmbus/
jetstream.rs

1//! Host interactions with JetStream, including processing of KV entries and
2//! storing/retrieving component specifications.
3
4use anyhow::{anyhow, ensure, Context as _};
5use async_nats::jetstream::kv::{Entry as KvEntry, Operation, Store};
6use serde::{Deserialize, Serialize};
7use tracing::{debug, error, info, instrument, warn};
8use wasmcloud_control_interface::Link;
9
10use crate::wasmbus::claims::{Claims, StoredClaims};
11use crate::wasmbus::component_import_links;
12
13#[derive(Debug, Serialize, Deserialize, Default)]
14/// The specification of a component that is or did run in the lattice. This contains all of the information necessary to
15/// instantiate a component in the lattice (url and digest) as well as configuration and links in order to facilitate
16/// runtime execution of the component. Each `import` in a component's WIT world will need a corresponding link for the
17/// host runtime to route messages to the correct component.
18pub struct ComponentSpecification {
19    /// The URL of the component, file, OCI, or otherwise
20    pub(crate) url: String,
21    /// All outbound links from this component to other components, used for routing when calling a component `import`
22    #[serde(default)]
23    pub(crate) links: Vec<Link>,
24    ////
25    // Possible additions in the future, left in as comments to facilitate discussion
26    ////
27    // /// The claims embedded in the component, if present
28    // claims: Option<Claims>,
29    // /// SHA256 digest of the component, used for checking uniqueness of component IDs
30    // digest: String
31    // /// (Advanced) Additional routing topics to subscribe on in addition to the component ID.
32    // routing_groups: Vec<String>,
33}
34
35impl ComponentSpecification {
36    /// Create a new empty component specification with the given ID and URL
37    pub fn new(url: impl AsRef<str>) -> Self {
38        Self {
39            url: url.as_ref().to_string(),
40            links: Vec::new(),
41        }
42    }
43}
44
45impl super::Host {
46    /// Retrieve a component specification based on the provided ID. The outer Result is for errors
47    /// accessing the store, and the inner option indicates if the spec exists.
48    #[instrument(level = "debug", skip_all)]
49    pub(crate) async fn get_component_spec(
50        &self,
51        id: &str,
52    ) -> anyhow::Result<Option<ComponentSpecification>> {
53        let key = format!("COMPONENT_{id}");
54        let spec = self
55            .data
56            .get(key)
57            .await
58            .context("failed to get component spec")?
59            .map(|spec_bytes| serde_json::from_slice(&spec_bytes))
60            .transpose()
61            .context(format!(
62                "failed to deserialize stored component specification for {id}"
63            ))?;
64        Ok(spec)
65    }
66
67    #[instrument(level = "debug", skip_all)]
68    pub(crate) async fn store_component_spec(
69        &self,
70        id: impl AsRef<str>,
71        spec: &ComponentSpecification,
72    ) -> anyhow::Result<()> {
73        let id = id.as_ref();
74        let key = format!("COMPONENT_{id}");
75        let bytes = serde_json::to_vec(spec)
76            .context("failed to serialize component spec")?
77            .into();
78        self.data
79            .put(key, bytes)
80            .await
81            .context("failed to put component spec")?;
82        Ok(())
83    }
84
85    #[instrument(level = "debug", skip_all)]
86    pub(crate) async fn process_component_spec_put(
87        &self,
88        id: impl AsRef<str>,
89        value: impl AsRef<[u8]>,
90    ) -> anyhow::Result<()> {
91        let id = id.as_ref();
92        debug!(id, "process component spec put");
93
94        let spec: ComponentSpecification = serde_json::from_slice(value.as_ref())
95            .context("failed to deserialize component specification")?;
96
97        // Compute all new links that do not exist in the host map, which we'll use to
98        // publish to any running providers that are the source or target of the link.
99        // Computing this ahead of time is a tradeoff to hold only one lock at the cost of
100        // allocating an extra Vec. This may be a good place to optimize allocations.
101        let new_links = {
102            let all_links = self.links.read().await;
103            spec.links
104                .iter()
105                .filter(|spec_link| {
106                    // Retain only links that do not exist in the host map
107                    !all_links
108                        .iter()
109                        .filter_map(|(source_id, links)| {
110                            // Only consider links that are either the source or target of the new link
111                            if source_id == spec_link.source_id() || source_id == spec_link.target()
112                            {
113                                Some(links)
114                            } else {
115                                None
116                            }
117                        })
118                        .flatten()
119                        .any(|host_link| *spec_link == host_link)
120                })
121                .collect::<Vec<_>>()
122        };
123
124        {
125            // Acquire lock once in this block to avoid continually trying to acquire it.
126            let providers = self.providers.read().await;
127            // For every new link, if a provider is running on this host as the source or target,
128            // send the link to the provider for handling based on the xkey public key.
129            for link in new_links {
130                if let Some(provider) = providers.get(link.source_id()) {
131                    if let Err(e) = self.put_provider_link(provider, link).await {
132                        error!(?e, "failed to put provider link");
133                    }
134                }
135                if let Some(provider) = providers.get(link.target()) {
136                    if let Err(e) = self.put_provider_link(provider, link).await {
137                        error!(?e, "failed to put provider link");
138                    }
139                }
140            }
141        }
142
143        // If the component is already running, update the links
144        if let Some(component) = self.components.write().await.get(id) {
145            *component.handler.instance_links.write().await = component_import_links(&spec.links);
146            // NOTE(brooksmtownsend): We can consider updating the component if the image URL changes
147        };
148
149        // Insert the links into host map
150        self.links.write().await.insert(id.to_string(), spec.links);
151
152        Ok(())
153    }
154
155    #[instrument(level = "debug", skip_all)]
156    pub(crate) async fn process_component_spec_delete(
157        &self,
158        id: impl AsRef<str>,
159    ) -> anyhow::Result<()> {
160        let id = id.as_ref();
161        debug!(id, "process component delete");
162        // TODO: TBD: stop component if spec deleted?
163        if self.components.write().await.get(id).is_some() {
164            warn!(
165                component_id = id,
166                "component spec deleted, but component is still running"
167            );
168        }
169        Ok(())
170    }
171
172    #[instrument(level = "debug", skip_all)]
173    pub(crate) async fn process_claims_put(
174        &self,
175        pubkey: impl AsRef<str>,
176        value: impl AsRef<[u8]>,
177    ) -> anyhow::Result<()> {
178        let pubkey = pubkey.as_ref();
179
180        debug!(pubkey, "process claim entry put");
181
182        let stored_claims: StoredClaims =
183            serde_json::from_slice(value.as_ref()).context("failed to decode stored claims")?;
184        let claims = Claims::from(stored_claims);
185
186        ensure!(claims.subject() == pubkey, "subject mismatch");
187        match claims {
188            Claims::Component(claims) => self.store_component_claims(claims).await,
189            Claims::Provider(claims) => {
190                let mut provider_claims = self.provider_claims.write().await;
191                provider_claims.insert(claims.subject.clone(), claims);
192                Ok(())
193            }
194        }
195    }
196
197    #[instrument(level = "debug", skip_all)]
198    pub(crate) async fn process_claims_delete(
199        &self,
200        pubkey: impl AsRef<str>,
201        value: impl AsRef<[u8]>,
202    ) -> anyhow::Result<()> {
203        let pubkey = pubkey.as_ref();
204
205        debug!(pubkey, "process claim entry deletion");
206
207        let stored_claims: StoredClaims =
208            serde_json::from_slice(value.as_ref()).context("failed to decode stored claims")?;
209        let claims = Claims::from(stored_claims);
210
211        ensure!(claims.subject() == pubkey, "subject mismatch");
212
213        match claims {
214            Claims::Component(claims) => {
215                let mut component_claims = self.component_claims.write().await;
216                component_claims.remove(&claims.subject);
217            }
218            Claims::Provider(claims) => {
219                let mut provider_claims = self.provider_claims.write().await;
220                provider_claims.remove(&claims.subject);
221            }
222        }
223
224        Ok(())
225    }
226
227    #[instrument(level = "trace", skip_all)]
228    pub(crate) async fn process_entry(
229        &self,
230        KvEntry {
231            key,
232            value,
233            operation,
234            ..
235        }: KvEntry,
236    ) {
237        let key_id = key.split_once('_');
238        let res = match (operation, key_id) {
239            (Operation::Put, Some(("COMPONENT", id))) => {
240                self.process_component_spec_put(id, value).await
241            }
242            (Operation::Delete, Some(("COMPONENT", id))) => {
243                self.process_component_spec_delete(id).await
244            }
245            (Operation::Put, Some(("LINKDEF", _id))) => {
246                debug!("ignoring deprecated LINKDEF put operation");
247                Ok(())
248            }
249            (Operation::Delete, Some(("LINKDEF", _id))) => {
250                debug!("ignoring deprecated LINKDEF delete operation");
251                Ok(())
252            }
253            (Operation::Put, Some(("CLAIMS", pubkey))) => {
254                self.process_claims_put(pubkey, value).await
255            }
256            (Operation::Delete, Some(("CLAIMS", pubkey))) => {
257                self.process_claims_delete(pubkey, value).await
258            }
259            (operation, Some(("REFMAP", id))) => {
260                // TODO: process REFMAP entries
261                debug!(?operation, id, "ignoring REFMAP entry");
262                Ok(())
263            }
264            _ => {
265                warn!(key, ?operation, "unsupported KV bucket entry");
266                Ok(())
267            }
268        };
269        if let Err(error) = &res {
270            error!(key, ?operation, ?error, "failed to process KV bucket entry");
271        }
272    }
273}
274
275#[instrument(level = "debug", skip_all)]
276pub(crate) async fn create_bucket(
277    jetstream: &async_nats::jetstream::Context,
278    bucket: &str,
279) -> anyhow::Result<Store> {
280    // Don't create the bucket if it already exists
281    if let Ok(store) = jetstream.get_key_value(bucket).await {
282        info!(%bucket, "bucket already exists. Skipping creation.");
283        return Ok(store);
284    }
285
286    match jetstream
287        .create_key_value(async_nats::jetstream::kv::Config {
288            bucket: bucket.to_string(),
289            ..Default::default()
290        })
291        .await
292    {
293        Ok(store) => {
294            info!(%bucket, "created bucket with 1 replica");
295            Ok(store)
296        }
297        Err(err) => Err(anyhow!(err).context(format!("failed to create bucket '{bucket}'"))),
298    }
299}