1use std::{collections::HashMap, sync::Arc};
4
5use anyhow::{anyhow, ensure, Context as _};
6use async_nats::jetstream::kv::{Entry as KvEntry, Operation, Store};
7use bytes::Bytes;
8use futures::{StreamExt as _, TryStreamExt as _};
9use tokio::{
10 sync::watch::{self, Receiver},
11 task::JoinSet,
12};
13use tracing::{debug, error, instrument, warn};
14
15use crate::{
16 config::ConfigManager,
17 store::StoreManager,
18 wasmbus::{
19 claims::{Claims, StoredClaims},
20 ComponentSpecification,
21 },
22};
23
24#[async_trait::async_trait]
25impl StoreManager for Store {
26 #[instrument(level = "debug", skip(self))]
27 async fn get(&self, key: &str) -> anyhow::Result<Option<Bytes>> {
28 self.get(key)
29 .await
30 .map_err(|err| anyhow::anyhow!("Failed to get config: {}", err))
31 }
32
33 #[instrument(level = "debug", skip(self, value))]
34 async fn put(&self, key: &str, value: Bytes) -> anyhow::Result<()> {
35 self.put(key, value)
36 .await
37 .map(|_| ())
38 .map_err(|err| anyhow::anyhow!("Failed to set config: {}", err))
39 }
40
41 #[instrument(level = "debug", skip(self))]
42 async fn del(&self, key: &str) -> anyhow::Result<()> {
43 self.purge(key)
44 .await
45 .map_err(|err| anyhow::anyhow!("Failed to delete config: {}", err))
46 }
47}
48
49#[async_trait::async_trait]
50impl ConfigManager for Store {
51 async fn watch(&self, name: &str) -> anyhow::Result<Receiver<HashMap<String, String>>> {
54 let config: HashMap<String, String> = match self.get(name).await {
55 Ok(Some(data)) => serde_json::from_slice(&data)
56 .context("Data corruption error, unable to decode data from store")?,
57 Ok(None) => return Err(anyhow::anyhow!("Config {} does not exist", name)),
58 Err(e) => return Err(anyhow::anyhow!("Error fetching config {}: {}", name, e)),
59 };
60
61 let (tx, rx) = watch::channel(config);
62 let name = name.to_owned();
64 let mut watcher = self.watch(&name).await.context("Failed to watch config")?;
65
66 tokio::spawn(async move {
67 loop {
68 if tx.is_closed() {
69 warn!(%name, "config watch channel closed, aborting watch");
70 return;
71 }
72
73 match watcher.try_next().await {
74 Ok(Some(entry))
75 if matches!(entry.operation, Operation::Delete | Operation::Purge) =>
76 {
77 tx.send_replace(HashMap::new());
81 }
82 Ok(Some(entry)) => {
83 let config: HashMap<String, String> = match serde_json::from_slice(
84 &entry.value,
85 ) {
86 Ok(config) => config,
87 Err(e) => {
88 error!(%name, error = %e, "Error decoding config from store during watch");
89 continue;
90 }
91 };
92 tx.send_if_modified(|current| {
93 if current == &config {
94 false
95 } else {
96 *current = config;
97 true
98 }
99 });
100 }
101 Ok(None) => {
102 error!(%name, "Watcher for config has closed");
103 return;
104 }
105 Err(e) => {
106 error!(%name, error = %e, "Error reading from watcher for config. Will wait for next entry");
107 continue;
108 }
109 }
110 }
111 });
112
113 Ok(rx)
114 }
115}
116
117impl crate::wasmbus::Host {
119 #[instrument(level = "trace", skip_all)]
120 pub(crate) async fn process_entry(
121 &self,
122 KvEntry {
123 key,
124 value,
125 operation,
126 ..
127 }: KvEntry,
128 ) {
129 let key_id = key.split_once('_');
130 let res = match (operation, key_id) {
131 (Operation::Put, Some(("COMPONENT", id))) => {
132 self.process_component_spec_put(id, value).await
133 }
134 (Operation::Delete, Some(("COMPONENT", id))) => {
135 self.process_component_spec_delete(id).await
136 }
137 (Operation::Put, Some(("LINKDEF", _id))) => {
138 debug!("ignoring deprecated LINKDEF put operation");
139 Ok(())
140 }
141 (Operation::Delete, Some(("LINKDEF", _id))) => {
142 debug!("ignoring deprecated LINKDEF delete operation");
143 Ok(())
144 }
145 (Operation::Put, Some(("CLAIMS", pubkey))) => {
146 self.process_claims_put(pubkey, value).await
147 }
148 (Operation::Delete, Some(("CLAIMS", pubkey))) => {
149 self.process_claims_delete(pubkey, value).await
150 }
151 (operation, Some(("REFMAP", id))) => {
152 debug!(?operation, id, "ignoring REFMAP entry");
154 Ok(())
155 }
156 _ => {
157 warn!(key, ?operation, "unsupported KV bucket entry");
158 Ok(())
159 }
160 };
161 if let Err(error) = &res {
162 error!(key, ?operation, ?error, "failed to process KV bucket entry");
163 }
164 }
165
166 #[instrument(level = "debug", skip_all)]
167 pub(crate) async fn process_component_spec_put(
168 &self,
169 id: impl AsRef<str>,
170 value: impl AsRef<[u8]>,
171 ) -> anyhow::Result<()> {
172 let id = id.as_ref();
173 debug!(id, "process component spec put");
174
175 let spec: ComponentSpecification = serde_json::from_slice(value.as_ref())
176 .context("failed to deserialize component specification")?;
177 self.update_host_with_spec(id, &spec)
178 .await
179 .context("failed to update component spec")?;
180
181 Ok(())
182 }
183
184 #[instrument(level = "debug", skip_all)]
185 pub(crate) async fn process_component_spec_delete(
186 &self,
187 id: impl AsRef<str>,
188 ) -> anyhow::Result<()> {
189 debug!(id = id.as_ref(), "process component delete");
190 self.delete_component_spec(id).await
191 }
192
193 #[instrument(level = "debug", skip_all)]
194 pub(crate) async fn process_claims_put(
199 &self,
200 pubkey: impl AsRef<str>,
201 value: impl AsRef<[u8]>,
202 ) -> anyhow::Result<()> {
203 let pubkey = pubkey.as_ref();
204
205 debug!(pubkey, "process claim entry put");
206
207 let stored_claims: StoredClaims =
208 serde_json::from_slice(value.as_ref()).context("failed to decode stored claims")?;
209 let claims = Claims::from(stored_claims);
210
211 ensure!(claims.subject() == pubkey, "subject mismatch");
212 match claims {
213 Claims::Component(claims) => self.store_component_claims(claims).await,
214 Claims::Provider(claims) => self.store_provider_claims(claims).await,
215 }
216 }
217
218 #[instrument(level = "debug", skip_all)]
219 pub(crate) async fn process_claims_delete(
220 &self,
221 pubkey: impl AsRef<str>,
222 value: impl AsRef<[u8]>,
223 ) -> anyhow::Result<()> {
224 let pubkey = pubkey.as_ref();
225
226 debug!(pubkey, "process claim entry deletion");
227
228 let stored_claims: StoredClaims =
229 serde_json::from_slice(value.as_ref()).context("failed to decode stored claims")?;
230 let claims = Claims::from(stored_claims);
231
232 ensure!(claims.subject() == pubkey, "subject mismatch");
233
234 match claims {
235 Claims::Component(_) => self.delete_component_claims(pubkey).await,
236 Claims::Provider(_) => self.delete_provider_claims(pubkey).await,
237 }
238 }
239}
240
241pub async fn data_watch(
243 tasks: &mut JoinSet<anyhow::Result<()>>,
244 store: Store,
245 host: Arc<crate::wasmbus::Host>,
246) -> anyhow::Result<()> {
247 tasks.spawn({
248 let host = Arc::clone(&host);
249 let data = store.clone();
250 async move {
251 let data_watch = data
253 .watch_all()
254 .await
255 .context("failed to watch lattice data bucket")?;
256
257 data.keys()
259 .await
260 .context("failed to read keys of lattice data bucket")?
261 .map_err(|e| anyhow!(e).context("failed to read lattice data stream"))
262 .try_filter_map(|key| async {
263 data.entry(key)
264 .await
265 .context("failed to get entry in lattice data bucket")
266 })
267 .for_each(|entry| async {
268 match entry {
269 Ok(entry) => host.process_entry(entry).await,
270 Err(err) => {
271 error!(%err, "failed to read entry from lattice data bucket")
272 }
273 }
274 })
275 .await;
276 data_watch
277 .for_each({
278 let host = Arc::clone(&host);
279 move |entry| {
280 let host = Arc::clone(&host);
281 async move {
282 match entry {
283 Err(error) => {
284 error!("failed to watch lattice data bucket: {error}");
285 }
286 Ok(entry) => host.process_entry(entry).await,
287 }
288 }
289 }
290 })
291 .await;
292 let deadline = { *host.stop_rx.borrow() };
293 host.stop_tx.send_replace(deadline);
294 Ok(())
295 }
296 });
297
298 Ok(())
299}