wasmcloud_host/wasmbus/component_spec.rs
1//! Host interactions with JetStream, including processing of KV entries and
2//! storing/retrieving component specifications.
3
4use anyhow::Context as _;
5use serde::{Deserialize, Serialize};
6use tracing::{error, instrument, warn};
7use wasmcloud_control_interface::Link;
8
9use crate::wasmbus::component_import_links;
10
11#[derive(Debug, Serialize, Deserialize, Default)]
12/// The specification of a component that is or did run in the lattice. This contains all of the information necessary to
13/// instantiate a component in the lattice (url and digest) as well as configuration and links in order to facilitate
14/// runtime execution of the component. Each `import` in a component's WIT world will need a corresponding link for the
15/// host runtime to route messages to the correct component.
16pub struct ComponentSpecification {
17 /// The URL of the component, file, OCI, or otherwise
18 pub(crate) url: String,
19 /// All outbound links from this component to other components, used for routing when calling a component `import`
20 #[serde(default)]
21 pub(crate) links: Vec<Link>,
22 ////
23 // Possible additions in the future, left in as comments to facilitate discussion
24 ////
25 // /// The claims embedded in the component, if present
26 // claims: Option<Claims>,
27 // /// SHA256 digest of the component, used for checking uniqueness of component IDs
28 // digest: String
29 // /// (Advanced) Additional routing topics to subscribe on in addition to the component ID.
30 // routing_groups: Vec<String>,
31}
32
33impl ComponentSpecification {
34 /// Create a new empty component specification with the given ID and URL
35 pub fn new(url: impl AsRef<str>) -> Self {
36 Self {
37 url: url.as_ref().to_string(),
38 links: Vec::new(),
39 }
40 }
41}
42
43impl super::Host {
44 /// Retrieve a component specification based on the provided ID. The outer Result is for errors
45 /// accessing the store, and the inner option indicates if the spec exists.
46 #[instrument(level = "debug", skip_all)]
47 pub(crate) async fn get_component_spec(
48 &self,
49 id: &str,
50 ) -> anyhow::Result<Option<ComponentSpecification>> {
51 let key = format!("COMPONENT_{id}");
52 let spec = self
53 .data_store
54 .get(&key)
55 .await
56 .context("failed to get component spec")?
57 .map(|spec_bytes| serde_json::from_slice(&spec_bytes))
58 .transpose()
59 .context(format!(
60 "failed to deserialize stored component specification for {id}"
61 ))?;
62 Ok(spec)
63 }
64
65 #[instrument(level = "debug", skip_all)]
66 pub(crate) async fn store_component_spec(
67 &self,
68 id: impl AsRef<str>,
69 spec: &ComponentSpecification,
70 ) -> anyhow::Result<()> {
71 let id = id.as_ref();
72 let key = format!("COMPONENT_{id}");
73 let bytes = serde_json::to_vec(spec)
74 .context("failed to serialize component spec")?
75 .into();
76 self.data_store
77 .put(&key, bytes)
78 .await
79 .context("failed to put component spec")?;
80 Ok(())
81 }
82
83 #[instrument(level = "debug", skip_all)]
84 pub(crate) async fn delete_component_spec(&self, id: impl AsRef<str>) -> anyhow::Result<()> {
85 let id = id.as_ref();
86 let key = format!("COMPONENT_{id}");
87 self.data_store
88 .del(&key)
89 .await
90 .context("failed to delete component spec")?;
91 if self.components.read().await.get(id).is_some() {
92 warn!(
93 component_id = id,
94 "component spec deleted, but component is still running"
95 );
96 }
97 Ok(())
98 }
99
100 #[instrument(level = "debug", skip_all)]
101 /// Update the component specification in the host map. This will also update the links in the
102 /// component handler if the component is already running. This will also send the new links to
103 /// any providers that are the source or target of the link.
104 ///
105 /// You must not be holding the following locks when calling this function:
106 /// - `self.links`
107 /// - `self.providers`
108 /// - `self.components`
109 pub(crate) async fn update_host_with_spec(
110 &self,
111 id: impl AsRef<str>,
112 spec: &ComponentSpecification,
113 ) -> anyhow::Result<()> {
114 // Compute all new links that do not exist in the host map, which we'll use to
115 // publish to any running providers that are the source or target of the link.
116 // Computing this ahead of time is a tradeoff to hold only one lock at the cost of
117 // allocating an extra Vec. This may be a good place to optimize allocations.
118 let new_links = {
119 let all_links = self.links.read().await;
120 spec.links
121 .iter()
122 .filter(|spec_link| {
123 // Retain only links that do not exist in the host map
124 !all_links
125 .iter()
126 .filter_map(|(source_id, links)| {
127 // Only consider links that are either the source or target of the new link
128 if source_id == spec_link.source_id() || source_id == spec_link.target()
129 {
130 Some(links)
131 } else {
132 None
133 }
134 })
135 .flatten()
136 .any(|host_link| *spec_link == host_link)
137 })
138 .collect::<Vec<_>>()
139 };
140
141 {
142 // Acquire lock once in this block to avoid continually trying to acquire it.
143 let providers = self.providers.read().await;
144 // For every new link, if a provider is running on this host as the source or target,
145 // send the link to the provider for handling based on the xkey public key.
146 for link in new_links {
147 if let Some(provider) = providers.get(link.source_id()) {
148 if let Err(e) = self.put_provider_link(provider, link).await {
149 error!(?e, "failed to put provider link");
150 }
151 }
152 if let Some(provider) = providers.get(link.target()) {
153 if let Err(e) = self.put_provider_link(provider, link).await {
154 error!(?e, "failed to put provider link");
155 }
156 }
157 }
158 }
159
160 // If the component is already running, update the links
161 if let Some(component) = self.components.write().await.get(id.as_ref()) {
162 *component.handler.instance_links.write().await = component_import_links(&spec.links);
163 // NOTE(brooksmtownsend): We can consider updating the component if the image URL changes
164 };
165
166 // Insert the links into host map
167 self.links
168 .write()
169 .await
170 .insert(id.as_ref().to_string(), spec.links.clone());
171
172 Ok(())
173 }
174}