1use anyhow::{anyhow, ensure, Context as _};
5use async_nats::jetstream::kv::{Entry as KvEntry, Operation, Store};
6use serde::{Deserialize, Serialize};
7use tracing::{debug, error, info, instrument, warn};
8use wasmcloud_control_interface::Link;
9
10use crate::wasmbus::claims::{Claims, StoredClaims};
11use crate::wasmbus::component_import_links;
12
13#[derive(Debug, Serialize, Deserialize, Default)]
14pub struct ComponentSpecification {
19 pub(crate) url: String,
21 #[serde(default)]
23 pub(crate) links: Vec<Link>,
24 }
34
35impl ComponentSpecification {
36 pub fn new(url: impl AsRef<str>) -> Self {
38 Self {
39 url: url.as_ref().to_string(),
40 links: Vec::new(),
41 }
42 }
43}
44
45impl super::Host {
46 #[instrument(level = "debug", skip_all)]
49 pub(crate) async fn get_component_spec(
50 &self,
51 id: &str,
52 ) -> anyhow::Result<Option<ComponentSpecification>> {
53 let key = format!("COMPONENT_{id}");
54 let spec = self
55 .data
56 .get(key)
57 .await
58 .context("failed to get component spec")?
59 .map(|spec_bytes| serde_json::from_slice(&spec_bytes))
60 .transpose()
61 .context(format!(
62 "failed to deserialize stored component specification for {id}"
63 ))?;
64 Ok(spec)
65 }
66
67 #[instrument(level = "debug", skip_all)]
68 pub(crate) async fn store_component_spec(
69 &self,
70 id: impl AsRef<str>,
71 spec: &ComponentSpecification,
72 ) -> anyhow::Result<()> {
73 let id = id.as_ref();
74 let key = format!("COMPONENT_{id}");
75 let bytes = serde_json::to_vec(spec)
76 .context("failed to serialize component spec")?
77 .into();
78 self.data
79 .put(key, bytes)
80 .await
81 .context("failed to put component spec")?;
82 Ok(())
83 }
84
85 #[instrument(level = "debug", skip_all)]
86 pub(crate) async fn process_component_spec_put(
87 &self,
88 id: impl AsRef<str>,
89 value: impl AsRef<[u8]>,
90 ) -> anyhow::Result<()> {
91 let id = id.as_ref();
92 debug!(id, "process component spec put");
93
94 let spec: ComponentSpecification = serde_json::from_slice(value.as_ref())
95 .context("failed to deserialize component specification")?;
96
97 let new_links = {
102 let all_links = self.links.read().await;
103 spec.links
104 .iter()
105 .filter(|spec_link| {
106 !all_links
108 .iter()
109 .filter_map(|(source_id, links)| {
110 if source_id == spec_link.source_id() || source_id == spec_link.target()
112 {
113 Some(links)
114 } else {
115 None
116 }
117 })
118 .flatten()
119 .any(|host_link| *spec_link == host_link)
120 })
121 .collect::<Vec<_>>()
122 };
123
124 {
125 let providers = self.providers.read().await;
127 for link in new_links {
130 if let Some(provider) = providers.get(link.source_id()) {
131 if let Err(e) = self.put_provider_link(provider, link).await {
132 error!(?e, "failed to put provider link");
133 }
134 }
135 if let Some(provider) = providers.get(link.target()) {
136 if let Err(e) = self.put_provider_link(provider, link).await {
137 error!(?e, "failed to put provider link");
138 }
139 }
140 }
141 }
142
143 if let Some(component) = self.components.write().await.get(id) {
145 *component.handler.instance_links.write().await = component_import_links(&spec.links);
146 };
148
149 self.links.write().await.insert(id.to_string(), spec.links);
151
152 Ok(())
153 }
154
155 #[instrument(level = "debug", skip_all)]
156 pub(crate) async fn process_component_spec_delete(
157 &self,
158 id: impl AsRef<str>,
159 ) -> anyhow::Result<()> {
160 let id = id.as_ref();
161 debug!(id, "process component delete");
162 if self.components.write().await.get(id).is_some() {
164 warn!(
165 component_id = id,
166 "component spec deleted, but component is still running"
167 );
168 }
169 Ok(())
170 }
171
172 #[instrument(level = "debug", skip_all)]
173 pub(crate) async fn process_claims_put(
174 &self,
175 pubkey: impl AsRef<str>,
176 value: impl AsRef<[u8]>,
177 ) -> anyhow::Result<()> {
178 let pubkey = pubkey.as_ref();
179
180 debug!(pubkey, "process claim entry put");
181
182 let stored_claims: StoredClaims =
183 serde_json::from_slice(value.as_ref()).context("failed to decode stored claims")?;
184 let claims = Claims::from(stored_claims);
185
186 ensure!(claims.subject() == pubkey, "subject mismatch");
187 match claims {
188 Claims::Component(claims) => self.store_component_claims(claims).await,
189 Claims::Provider(claims) => {
190 let mut provider_claims = self.provider_claims.write().await;
191 provider_claims.insert(claims.subject.clone(), claims);
192 Ok(())
193 }
194 }
195 }
196
197 #[instrument(level = "debug", skip_all)]
198 pub(crate) async fn process_claims_delete(
199 &self,
200 pubkey: impl AsRef<str>,
201 value: impl AsRef<[u8]>,
202 ) -> anyhow::Result<()> {
203 let pubkey = pubkey.as_ref();
204
205 debug!(pubkey, "process claim entry deletion");
206
207 let stored_claims: StoredClaims =
208 serde_json::from_slice(value.as_ref()).context("failed to decode stored claims")?;
209 let claims = Claims::from(stored_claims);
210
211 ensure!(claims.subject() == pubkey, "subject mismatch");
212
213 match claims {
214 Claims::Component(claims) => {
215 let mut component_claims = self.component_claims.write().await;
216 component_claims.remove(&claims.subject);
217 }
218 Claims::Provider(claims) => {
219 let mut provider_claims = self.provider_claims.write().await;
220 provider_claims.remove(&claims.subject);
221 }
222 }
223
224 Ok(())
225 }
226
227 #[instrument(level = "trace", skip_all)]
228 pub(crate) async fn process_entry(
229 &self,
230 KvEntry {
231 key,
232 value,
233 operation,
234 ..
235 }: KvEntry,
236 ) {
237 let key_id = key.split_once('_');
238 let res = match (operation, key_id) {
239 (Operation::Put, Some(("COMPONENT", id))) => {
240 self.process_component_spec_put(id, value).await
241 }
242 (Operation::Delete, Some(("COMPONENT", id))) => {
243 self.process_component_spec_delete(id).await
244 }
245 (Operation::Put, Some(("LINKDEF", _id))) => {
246 debug!("ignoring deprecated LINKDEF put operation");
247 Ok(())
248 }
249 (Operation::Delete, Some(("LINKDEF", _id))) => {
250 debug!("ignoring deprecated LINKDEF delete operation");
251 Ok(())
252 }
253 (Operation::Put, Some(("CLAIMS", pubkey))) => {
254 self.process_claims_put(pubkey, value).await
255 }
256 (Operation::Delete, Some(("CLAIMS", pubkey))) => {
257 self.process_claims_delete(pubkey, value).await
258 }
259 (operation, Some(("REFMAP", id))) => {
260 debug!(?operation, id, "ignoring REFMAP entry");
262 Ok(())
263 }
264 _ => {
265 warn!(key, ?operation, "unsupported KV bucket entry");
266 Ok(())
267 }
268 };
269 if let Err(error) = &res {
270 error!(key, ?operation, ?error, "failed to process KV bucket entry");
271 }
272 }
273}
274
275#[instrument(level = "debug", skip_all)]
276pub(crate) async fn create_bucket(
277 jetstream: &async_nats::jetstream::Context,
278 bucket: &str,
279) -> anyhow::Result<Store> {
280 if let Ok(store) = jetstream.get_key_value(bucket).await {
282 info!(%bucket, "bucket already exists. Skipping creation.");
283 return Ok(store);
284 }
285
286 match jetstream
287 .create_key_value(async_nats::jetstream::kv::Config {
288 bucket: bucket.to_string(),
289 ..Default::default()
290 })
291 .await
292 {
293 Ok(store) => {
294 info!(%bucket, "created bucket with 1 replica");
295 Ok(store)
296 }
297 Err(err) => Err(anyhow!(err).context(format!("failed to create bucket '{bucket}'"))),
298 }
299}