wasmcloud_host/wasmbus/
component_spec.rs

1//! Host interactions with JetStream, including processing of KV entries and
2//! storing/retrieving component specifications.
3
4use anyhow::Context as _;
5use serde::{Deserialize, Serialize};
6use tracing::{error, instrument, warn};
7use wasmcloud_control_interface::Link;
8
9use crate::wasmbus::component_import_links;
10
11#[derive(Debug, Serialize, Deserialize, Default)]
12/// The specification of a component that is or did run in the lattice. This contains all of the information necessary to
13/// instantiate a component in the lattice (url and digest) as well as configuration and links in order to facilitate
14/// runtime execution of the component. Each `import` in a component's WIT world will need a corresponding link for the
15/// host runtime to route messages to the correct component.
16pub struct ComponentSpecification {
17    /// The URL of the component, file, OCI, or otherwise
18    pub(crate) url: String,
19    /// All outbound links from this component to other components, used for routing when calling a component `import`
20    #[serde(default)]
21    pub(crate) links: Vec<Link>,
22    ////
23    // Possible additions in the future, left in as comments to facilitate discussion
24    ////
25    // /// The claims embedded in the component, if present
26    // claims: Option<Claims>,
27    // /// SHA256 digest of the component, used for checking uniqueness of component IDs
28    // digest: String
29    // /// (Advanced) Additional routing topics to subscribe on in addition to the component ID.
30    // routing_groups: Vec<String>,
31}
32
33impl ComponentSpecification {
34    /// Create a new empty component specification with the given ID and URL
35    pub fn new(url: impl AsRef<str>) -> Self {
36        Self {
37            url: url.as_ref().to_string(),
38            links: Vec::new(),
39        }
40    }
41}
42
43impl super::Host {
44    /// Retrieve a component specification based on the provided ID. The outer Result is for errors
45    /// accessing the store, and the inner option indicates if the spec exists.
46    #[instrument(level = "debug", skip_all)]
47    pub(crate) async fn get_component_spec(
48        &self,
49        id: &str,
50    ) -> anyhow::Result<Option<ComponentSpecification>> {
51        let key = format!("COMPONENT_{id}");
52        let spec = self
53            .data_store
54            .get(&key)
55            .await
56            .context("failed to get component spec")?
57            .map(|spec_bytes| serde_json::from_slice(&spec_bytes))
58            .transpose()
59            .context(format!(
60                "failed to deserialize stored component specification for {id}"
61            ))?;
62        Ok(spec)
63    }
64
65    #[instrument(level = "debug", skip_all)]
66    pub(crate) async fn store_component_spec(
67        &self,
68        id: impl AsRef<str>,
69        spec: &ComponentSpecification,
70    ) -> anyhow::Result<()> {
71        let id = id.as_ref();
72        let key = format!("COMPONENT_{id}");
73        let bytes = serde_json::to_vec(spec)
74            .context("failed to serialize component spec")?
75            .into();
76        self.data_store
77            .put(&key, bytes)
78            .await
79            .context("failed to put component spec")?;
80        Ok(())
81    }
82
83    #[instrument(level = "debug", skip_all)]
84    pub(crate) async fn delete_component_spec(&self, id: impl AsRef<str>) -> anyhow::Result<()> {
85        let id = id.as_ref();
86        let key = format!("COMPONENT_{id}");
87        self.data_store
88            .del(&key)
89            .await
90            .context("failed to delete component spec")?;
91        if self.components.read().await.get(id).is_some() {
92            warn!(
93                component_id = id,
94                "component spec deleted, but component is still running"
95            );
96        }
97        Ok(())
98    }
99
100    #[instrument(level = "debug", skip_all)]
101    /// Update the component specification in the host map. This will also update the links in the
102    /// component handler if the component is already running. This will also send the new links to
103    /// any providers that are the source or target of the link.
104    ///
105    /// You must not be holding the following locks when calling this function:
106    /// - `self.links`
107    /// - `self.providers`
108    /// - `self.components`
109    pub(crate) async fn update_host_with_spec(
110        &self,
111        id: impl AsRef<str>,
112        spec: &ComponentSpecification,
113    ) -> anyhow::Result<()> {
114        // Compute all new links that do not exist in the host map, which we'll use to
115        // publish to any running providers that are the source or target of the link.
116        // Computing this ahead of time is a tradeoff to hold only one lock at the cost of
117        // allocating an extra Vec. This may be a good place to optimize allocations.
118        let new_links = {
119            let all_links = self.links.read().await;
120            spec.links
121                .iter()
122                .filter(|spec_link| {
123                    // Retain only links that do not exist in the host map
124                    !all_links
125                        .iter()
126                        .filter_map(|(source_id, links)| {
127                            // Only consider links that are either the source or target of the new link
128                            if source_id == spec_link.source_id() || source_id == spec_link.target()
129                            {
130                                Some(links)
131                            } else {
132                                None
133                            }
134                        })
135                        .flatten()
136                        .any(|host_link| *spec_link == host_link)
137                })
138                .collect::<Vec<_>>()
139        };
140
141        {
142            // Acquire lock once in this block to avoid continually trying to acquire it.
143            let providers = self.providers.read().await;
144            // For every new link, if a provider is running on this host as the source or target,
145            // send the link to the provider for handling based on the xkey public key.
146            for link in new_links {
147                if let Some(provider) = providers.get(link.source_id()) {
148                    if let Err(e) = self.put_provider_link(provider, link).await {
149                        error!(?e, "failed to put provider link");
150                    }
151                }
152                if let Some(provider) = providers.get(link.target()) {
153                    if let Err(e) = self.put_provider_link(provider, link).await {
154                        error!(?e, "failed to put provider link");
155                    }
156                }
157            }
158        }
159
160        // If the component is already running, update the links
161        if let Some(component) = self.components.write().await.get(id.as_ref()) {
162            *component.handler.instance_links.write().await = component_import_links(&spec.links);
163            // NOTE(brooksmtownsend): We can consider updating the component if the image URL changes
164        };
165
166        // Insert the links into host map
167        self.links
168            .write()
169            .await
170            .insert(id.as_ref().to_string(), spec.links.clone());
171
172        Ok(())
173    }
174}