1use anyhow::Context as _;
4use async_nats::jetstream::kv::Store;
5use bytes::Bytes;
6use futures::future::Either;
7use futures::stream::SelectAll;
8use futures::{Stream, StreamExt, TryFutureExt as _};
9use serde::Serialize;
10use std::pin::Pin;
11use std::sync::Arc;
12use std::task::{Context, Poll};
13use tokio::task::JoinSet;
14use tracing::{error, instrument, trace, warn};
15use wasmcloud_control_interface::CtlResponse;
16use wasmcloud_core::CTL_API_VERSION_1;
17use wasmcloud_tracing::context::TraceContextInjector;
18
19use crate::wasmbus::injector_to_headers;
20
21use super::store::data_watch;
22
23#[derive(Debug)]
24pub(crate) struct Queue {
25 all_streams: SelectAll<async_nats::Subscriber>,
26}
27
28impl Stream for Queue {
29 type Item = async_nats::Message;
30
31 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32 self.all_streams.poll_next_unpin(cx)
33 }
34}
35
36impl Queue {
37 #[instrument]
38 pub(crate) async fn new(
39 nats: &async_nats::Client,
40 topic_prefix: &str,
41 lattice: &str,
42 host_id: &str,
43 component_auction: bool,
44 provider_auction: bool,
45 ) -> anyhow::Result<Self> {
46 let mut subs = vec![
47 Either::Left(nats.subscribe(format!(
48 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.registry.put",
49 ))),
50 Either::Left(nats.subscribe(format!(
51 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.host.ping",
52 ))),
53 Either::Right(nats.queue_subscribe(
54 format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.link.*"),
55 format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.link",),
56 )),
57 Either::Right(nats.queue_subscribe(
58 format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.claims.get"),
59 format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.claims"),
60 )),
61 Either::Left(nats.subscribe(format!(
62 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.component.*.{host_id}"
63 ))),
64 Either::Left(nats.subscribe(format!(
65 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.provider.*.{host_id}"
66 ))),
67 Either::Left(nats.subscribe(format!(
68 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.label.*.{host_id}"
69 ))),
70 Either::Left(nats.subscribe(format!(
71 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.host.*.{host_id}"
72 ))),
73 Either::Right(nats.queue_subscribe(
74 format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.config.>"),
75 format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.config"),
76 )),
77 ];
78 if component_auction {
79 subs.push(Either::Left(nats.subscribe(format!(
80 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.component.auction",
81 ))));
82 }
83 if provider_auction {
84 subs.push(Either::Left(nats.subscribe(format!(
85 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.provider.auction",
86 ))));
87 }
88 let streams = futures::future::join_all(subs)
89 .await
90 .into_iter()
91 .collect::<Result<Vec<_>, async_nats::SubscribeError>>()
92 .context("failed to subscribe to queues")?;
93 Ok(Self {
94 all_streams: futures::stream::select_all(streams),
95 })
96 }
97}
98
99impl crate::wasmbus::Host {
100 #[instrument(level = "trace", skip_all, fields(subject = %message.subject))]
101 pub(crate) async fn handle_ctl_message(
102 self: Arc<Self>,
103 message: async_nats::Message,
104 ctl_subject_prefix: &str,
105 ) -> Option<Bytes> {
106 opentelemetry_nats::attach_span_context(&message);
110 let subject = message.subject;
113 let mut parts = subject
114 .trim()
115 .trim_start_matches(ctl_subject_prefix)
116 .trim_start_matches('.')
117 .split('.')
118 .skip(2);
119 trace!(%subject, "handling control interface request");
120
121 let ctl_response = match (parts.next(), parts.next(), parts.next(), parts.next()) {
130 (Some("component"), Some("auction"), None, None) => self
132 .handle_auction_component(message.payload)
133 .await
134 .map(serialize_ctl_response),
135 (Some("component"), Some("scale"), Some(_host_id), None) => Arc::clone(&self)
136 .handle_scale_component(message.payload)
137 .await
138 .map(Some)
139 .map(serialize_ctl_response),
140 (Some("component"), Some("update"), Some(_host_id), None) => Arc::clone(&self)
141 .handle_update_component(message.payload)
142 .await
143 .map(Some)
144 .map(serialize_ctl_response),
145 (Some("provider"), Some("auction"), None, None) => self
147 .handle_auction_provider(message.payload)
148 .await
149 .map(serialize_ctl_response),
150 (Some("provider"), Some("start"), Some(_host_id), None) => Arc::clone(&self)
151 .handle_start_provider(message.payload)
152 .await
153 .map(serialize_ctl_response),
154 (Some("provider"), Some("stop"), Some(_host_id), None) => self
155 .handle_stop_provider(message.payload)
156 .await
157 .map(Some)
158 .map(serialize_ctl_response),
159 (Some("host"), Some("get"), Some(_host_id), None) => self
161 .handle_inventory()
162 .await
163 .map(Some)
164 .map(serialize_ctl_response),
165 (Some("host"), Some("ping"), None, None) => self
166 .handle_ping_hosts()
167 .await
168 .map(Some)
169 .map(serialize_ctl_response),
170 (Some("host"), Some("stop"), Some(host_id), None) => self
171 .handle_stop_host(message.payload, host_id)
172 .await
173 .map(Some)
174 .map(serialize_ctl_response),
175 (Some("claims"), Some("get"), None, None) => self
177 .handle_claims()
178 .await
179 .map(Some)
180 .map(serialize_ctl_response),
181 (Some("link"), Some("del"), None, None) => self
183 .handle_link_del(message.payload)
184 .await
185 .map(Some)
186 .map(serialize_ctl_response),
187 (Some("link"), Some("get"), None, None) => {
188 self.handle_links().await.map(|bytes| Some(Ok(bytes)))
190 }
191 (Some("link"), Some("put"), None, None) => self
192 .handle_link_put(message.payload)
193 .await
194 .map(Some)
195 .map(serialize_ctl_response),
196 (Some("label"), Some("del"), Some(host_id), None) => self
198 .handle_label_del(host_id, message.payload)
199 .await
200 .map(Some)
201 .map(serialize_ctl_response),
202 (Some("label"), Some("put"), Some(host_id), None) => self
203 .handle_label_put(host_id, message.payload)
204 .await
205 .map(Some)
206 .map(serialize_ctl_response),
207 (Some("registry"), Some("put"), None, None) => self
209 .handle_registries_put(message.payload)
210 .await
211 .map(Some)
212 .map(serialize_ctl_response),
213 (Some("config"), Some("get"), Some(config_name), None) => self
215 .handle_config_get(config_name)
216 .await
217 .map(|bytes| Some(Ok(bytes))),
218 (Some("config"), Some("put"), Some(config_name), None) => self
219 .handle_config_put(config_name, message.payload)
220 .await
221 .map(Some)
222 .map(serialize_ctl_response),
223 (Some("config"), Some("del"), Some(config_name), None) => self
224 .handle_config_delete(config_name)
225 .await
226 .map(Some)
227 .map(serialize_ctl_response),
228 _ => {
230 warn!(%subject, "received control interface request on unsupported subject");
231 Ok(serialize_ctl_response(Some(CtlResponse::error(
232 "unsupported subject",
233 ))))
234 }
235 };
236
237 if let Err(err) = &ctl_response {
238 error!(%subject, ?err, "failed to handle control interface request");
239 } else {
240 trace!(%subject, "handled control interface request");
241 }
242
243 match ctl_response {
244 Ok(Some(Ok(payload))) => Some(payload.into()),
245 Ok(None) => None,
247 Err(e) => Some(
248 serde_json::to_vec(&CtlResponse::error(&e.to_string()))
249 .context("failed to encode control interface response")
250 .unwrap_or_else(|_| format!(r#"{{"success":false,"error":"{e}"}}"#).into())
253 .into(),
254 ),
255 Ok(Some(Err(e))) => Some(
258 serde_json::to_vec(&CtlResponse::error(&e.to_string()))
259 .context("failed to encode control interface response")
260 .unwrap_or_else(|_| format!(r#"{{"success":false,"error":"{e}"}}"#).into())
261 .into(),
262 ),
263 }
264 }
265}
266
267pub struct NatsControlInterfaceServer {
270 ctl_nats: Arc<async_nats::Client>,
271 data_store: Store,
272 ctl_topic_prefix: String,
273 enable_component_auction: bool,
274 enable_provider_auction: bool,
275}
276
277impl NatsControlInterfaceServer {
278 pub fn new(
287 ctl_nats: async_nats::Client,
288 data_store: Store,
289 ctl_topic_prefix: String,
290 enable_component_auction: bool,
291 enable_provider_auction: bool,
292 ) -> Self {
293 Self {
294 ctl_nats: Arc::new(ctl_nats),
295 data_store,
296 ctl_topic_prefix,
297 enable_component_auction,
298 enable_provider_auction,
299 }
300 }
301
302 #[instrument(level = "trace", skip_all)]
303 pub async fn start(
306 self,
307 host: Arc<crate::wasmbus::Host>,
308 ) -> anyhow::Result<JoinSet<anyhow::Result<()>>> {
309 let queue = Queue::new(
310 &self.ctl_nats,
311 &self.ctl_topic_prefix,
312 host.lattice(),
313 &host.id(),
314 self.enable_component_auction,
315 self.enable_provider_auction,
316 )
317 .await
318 .context("failed to initialize queue")?;
319
320 let mut tasks = JoinSet::new();
321 data_watch(&mut tasks, self.data_store, host.clone())
322 .await
323 .context("failed to start data watch")?;
324
325 tasks.spawn({
326 let ctl_nats = Arc::clone(&self.ctl_nats);
327 let host = Arc::clone(&host);
328 let ctl_subject_prefix = Arc::new(self.ctl_topic_prefix.clone());
329 async move {
330 queue
331 .for_each_concurrent(None, {
332 let host = Arc::clone(&host);
333 let ctl_nats = Arc::clone(&ctl_nats);
334 let ctl_subject_prefix = Arc::clone(&ctl_subject_prefix);
335 move |msg| {
336 let host = Arc::clone(&host);
337 let ctl_nats = Arc::clone(&ctl_nats);
338 let ctl_subject_prefix = Arc::clone(&ctl_subject_prefix);
339 async move {
340 let msg_subject = msg.subject.clone();
341 let msg_reply = msg.reply.clone();
342 let payload = host.handle_ctl_message(msg, &ctl_subject_prefix).await;
343 if let Some(reply) = msg_reply {
344 let headers = injector_to_headers(&TraceContextInjector::default_with_span());
345 if let Some(payload) = payload {
346 let max_payload = ctl_nats.server_info().max_payload;
347 if payload.len() > max_payload {
348 warn!(
349 size = payload.len(),
350 max_size = max_payload,
351 "ctl response payload is too large to publish and may fail",
352 );
353 }
354 if let Err(err) =
355 ctl_nats
356 .publish_with_headers(reply.clone(), headers, payload)
357 .err_into::<anyhow::Error>()
358 .and_then(|()| ctl_nats.flush().err_into::<anyhow::Error>())
359 .await
360 {
361 tracing::error!(%msg_subject, ?err, "failed to publish reply to control interface request");
362 }
363 }
364 }
365 }
366 }
367 })
368 .await;
369
370 let deadline = { *host.stop_rx.borrow() };
371 host.stop_tx.send_replace(deadline);
372 Ok(())
373 }
374 });
375
376 Ok(tasks)
377 }
378}
379
380fn serialize_ctl_response<T: Serialize>(
382 ctl_response: Option<CtlResponse<T>>,
383) -> Option<anyhow::Result<Vec<u8>>> {
384 ctl_response.map(|resp| serde_json::to_vec(&resp).map_err(anyhow::Error::from))
385}