wasmcloud_host/nats/
ctl.rs

1//! The NATS implementation of the control interface.
2
3use anyhow::Context as _;
4use async_nats::jetstream::kv::Store;
5use bytes::Bytes;
6use futures::future::Either;
7use futures::stream::SelectAll;
8use futures::{Stream, StreamExt, TryFutureExt as _};
9use serde::Serialize;
10use std::pin::Pin;
11use std::sync::Arc;
12use std::task::{Context, Poll};
13use tokio::task::JoinSet;
14use tracing::{error, instrument, trace, warn};
15use wasmcloud_control_interface::CtlResponse;
16use wasmcloud_core::CTL_API_VERSION_1;
17use wasmcloud_tracing::context::TraceContextInjector;
18
19use crate::wasmbus::injector_to_headers;
20
21use super::store::data_watch;
22
23#[derive(Debug)]
24pub(crate) struct Queue {
25    all_streams: SelectAll<async_nats::Subscriber>,
26}
27
28impl Stream for Queue {
29    type Item = async_nats::Message;
30
31    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32        self.all_streams.poll_next_unpin(cx)
33    }
34}
35
36impl Queue {
37    #[instrument]
38    pub(crate) async fn new(
39        nats: &async_nats::Client,
40        topic_prefix: &str,
41        lattice: &str,
42        host_id: &str,
43        component_auction: bool,
44        provider_auction: bool,
45    ) -> anyhow::Result<Self> {
46        let mut subs = vec![
47            Either::Left(nats.subscribe(format!(
48                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.registry.put",
49            ))),
50            Either::Left(nats.subscribe(format!(
51                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.host.ping",
52            ))),
53            Either::Right(nats.queue_subscribe(
54                format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.link.*"),
55                format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.link",),
56            )),
57            Either::Right(nats.queue_subscribe(
58                format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.claims.get"),
59                format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.claims"),
60            )),
61            Either::Left(nats.subscribe(format!(
62                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.component.*.{host_id}"
63            ))),
64            Either::Left(nats.subscribe(format!(
65                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.provider.*.{host_id}"
66            ))),
67            Either::Left(nats.subscribe(format!(
68                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.label.*.{host_id}"
69            ))),
70            Either::Left(nats.subscribe(format!(
71                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.host.*.{host_id}"
72            ))),
73            Either::Right(nats.queue_subscribe(
74                format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.config.>"),
75                format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.config"),
76            )),
77        ];
78        if component_auction {
79            subs.push(Either::Left(nats.subscribe(format!(
80                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.component.auction",
81            ))));
82        }
83        if provider_auction {
84            subs.push(Either::Left(nats.subscribe(format!(
85                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.provider.auction",
86            ))));
87        }
88        let streams = futures::future::join_all(subs)
89            .await
90            .into_iter()
91            .collect::<Result<Vec<_>, async_nats::SubscribeError>>()
92            .context("failed to subscribe to queues")?;
93        Ok(Self {
94            all_streams: futures::stream::select_all(streams),
95        })
96    }
97}
98
99impl crate::wasmbus::Host {
100    #[instrument(level = "trace", skip_all, fields(subject = %message.subject))]
101    pub(crate) async fn handle_ctl_message(
102        self: Arc<Self>,
103        message: async_nats::Message,
104        ctl_subject_prefix: &str,
105    ) -> Option<Bytes> {
106        // NOTE: if log level is not `trace`, this won't have an effect, since the current span is
107        // disabled. In most cases that's fine, since we aren't aware of any control interface
108        // requests including a trace context
109        opentelemetry_nats::attach_span_context(&message);
110        // Skip the topic prefix, the version, and the lattice
111        // e.g. `wasmbus.ctl.v1.{prefix}`
112        let subject = message.subject;
113        let mut parts = subject
114            .trim()
115            .trim_start_matches(ctl_subject_prefix)
116            .trim_start_matches('.')
117            .split('.')
118            .skip(2);
119        trace!(%subject, "handling control interface request");
120
121        // This response is a wrapped Result<Option<Result<Vec<u8>>>> for a good reason.
122        // The outer Result is for reporting protocol errors in handling the request, e.g. failing to
123        //    deserialize the request payload.
124        // The Option is for the case where the request is handled successfully, but the handler
125        //    doesn't want to send a response back to the client, like with an auction.
126        // The inner Result is purely for the success or failure of serializing the [CtlResponse], which
127        //    should never fail but it's a result we must handle.
128        // And finally, the Vec<u8> is the serialized [CtlResponse] that we'll send back to the client
129        let ctl_response = match (parts.next(), parts.next(), parts.next(), parts.next()) {
130            // Component commands
131            (Some("component"), Some("auction"), None, None) => self
132                .handle_auction_component(message.payload)
133                .await
134                .map(serialize_ctl_response),
135            (Some("component"), Some("scale"), Some(_host_id), None) => Arc::clone(&self)
136                .handle_scale_component(message.payload)
137                .await
138                .map(Some)
139                .map(serialize_ctl_response),
140            (Some("component"), Some("update"), Some(_host_id), None) => Arc::clone(&self)
141                .handle_update_component(message.payload)
142                .await
143                .map(Some)
144                .map(serialize_ctl_response),
145            // Provider commands
146            (Some("provider"), Some("auction"), None, None) => self
147                .handle_auction_provider(message.payload)
148                .await
149                .map(serialize_ctl_response),
150            (Some("provider"), Some("start"), Some(_host_id), None) => Arc::clone(&self)
151                .handle_start_provider(message.payload)
152                .await
153                .map(serialize_ctl_response),
154            (Some("provider"), Some("stop"), Some(_host_id), None) => self
155                .handle_stop_provider(message.payload)
156                .await
157                .map(Some)
158                .map(serialize_ctl_response),
159            // Host commands
160            (Some("host"), Some("get"), Some(_host_id), None) => self
161                .handle_inventory()
162                .await
163                .map(Some)
164                .map(serialize_ctl_response),
165            (Some("host"), Some("ping"), None, None) => self
166                .handle_ping_hosts()
167                .await
168                .map(Some)
169                .map(serialize_ctl_response),
170            (Some("host"), Some("stop"), Some(host_id), None) => self
171                .handle_stop_host(message.payload, host_id)
172                .await
173                .map(Some)
174                .map(serialize_ctl_response),
175            // Claims commands
176            (Some("claims"), Some("get"), None, None) => self
177                .handle_claims()
178                .await
179                .map(Some)
180                .map(serialize_ctl_response),
181            // Link commands
182            (Some("link"), Some("del"), None, None) => self
183                .handle_link_del(message.payload)
184                .await
185                .map(Some)
186                .map(serialize_ctl_response),
187            (Some("link"), Some("get"), None, None) => {
188                // Explicitly returning a Vec<u8> for non-cloning efficiency within handle_links
189                self.handle_links().await.map(|bytes| Some(Ok(bytes)))
190            }
191            (Some("link"), Some("put"), None, None) => self
192                .handle_link_put(message.payload)
193                .await
194                .map(Some)
195                .map(serialize_ctl_response),
196            // Label commands
197            (Some("label"), Some("del"), Some(host_id), None) => self
198                .handle_label_del(host_id, message.payload)
199                .await
200                .map(Some)
201                .map(serialize_ctl_response),
202            (Some("label"), Some("put"), Some(host_id), None) => self
203                .handle_label_put(host_id, message.payload)
204                .await
205                .map(Some)
206                .map(serialize_ctl_response),
207            // Registry commands
208            (Some("registry"), Some("put"), None, None) => self
209                .handle_registries_put(message.payload)
210                .await
211                .map(Some)
212                .map(serialize_ctl_response),
213            // Config commands
214            (Some("config"), Some("get"), Some(config_name), None) => self
215                .handle_config_get(config_name)
216                .await
217                .map(|bytes| Some(Ok(bytes))),
218            (Some("config"), Some("put"), Some(config_name), None) => self
219                .handle_config_put(config_name, message.payload)
220                .await
221                .map(Some)
222                .map(serialize_ctl_response),
223            (Some("config"), Some("del"), Some(config_name), None) => self
224                .handle_config_delete(config_name)
225                .await
226                .map(Some)
227                .map(serialize_ctl_response),
228            // Topic fallback
229            _ => {
230                warn!(%subject, "received control interface request on unsupported subject");
231                Ok(serialize_ctl_response(Some(CtlResponse::error(
232                    "unsupported subject",
233                ))))
234            }
235        };
236
237        if let Err(err) = &ctl_response {
238            error!(%subject, ?err, "failed to handle control interface request");
239        } else {
240            trace!(%subject, "handled control interface request");
241        }
242
243        match ctl_response {
244            Ok(Some(Ok(payload))) => Some(payload.into()),
245            // No response from the host (e.g. auctioning provider)
246            Ok(None) => None,
247            Err(e) => Some(
248                serde_json::to_vec(&CtlResponse::error(&e.to_string()))
249                    .context("failed to encode control interface response")
250                    // This should never fail to serialize, but the fallback ensures that we send
251                    // something back to the client even if we somehow fail.
252                    .unwrap_or_else(|_| format!(r#"{{"success":false,"error":"{e}"}}"#).into())
253                    .into(),
254            ),
255            // This would only occur if we failed to serialize a valid CtlResponse. This is
256            // programmer error.
257            Ok(Some(Err(e))) => Some(
258                serde_json::to_vec(&CtlResponse::error(&e.to_string()))
259                    .context("failed to encode control interface response")
260                    .unwrap_or_else(|_| format!(r#"{{"success":false,"error":"{e}"}}"#).into())
261                    .into(),
262            ),
263        }
264    }
265}
266
267/// A control interface server that receives messages on the NATS message bus and
268/// dispatches them to the host for processing.
269pub struct NatsControlInterfaceServer {
270    ctl_nats: Arc<async_nats::Client>,
271    data_store: Store,
272    ctl_topic_prefix: String,
273    enable_component_auction: bool,
274    enable_provider_auction: bool,
275}
276
277impl NatsControlInterfaceServer {
278    /// Create a new NATS control interface server.
279    ///
280    /// # Arguments
281    /// * `ctl_nats` - The NATS client to use for sending and receiving messages.
282    /// * `data_store` - The JetStream KV bucket where ComponentSpecs are stored.
283    /// * `ctl_topic_prefix` - The topic prefix to use for control interface messages.
284    /// * `enable_component_auction` - Whether to enable component auctioning.
285    /// * `enable_provider_auction` - Whether to enable provider auctioning.
286    pub fn new(
287        ctl_nats: async_nats::Client,
288        data_store: Store,
289        ctl_topic_prefix: String,
290        enable_component_auction: bool,
291        enable_provider_auction: bool,
292    ) -> Self {
293        Self {
294            ctl_nats: Arc::new(ctl_nats),
295            data_store,
296            ctl_topic_prefix,
297            enable_component_auction,
298            enable_provider_auction,
299        }
300    }
301
302    #[instrument(level = "trace", skip_all)]
303    /// Start the control interface server, returning a JoinSet of tasks.
304    /// This will start the NATS subscriber and the data watch tasks.
305    pub async fn start(
306        self,
307        host: Arc<crate::wasmbus::Host>,
308    ) -> anyhow::Result<JoinSet<anyhow::Result<()>>> {
309        let queue = Queue::new(
310            &self.ctl_nats,
311            &self.ctl_topic_prefix,
312            host.lattice(),
313            &host.id(),
314            self.enable_component_auction,
315            self.enable_provider_auction,
316        )
317        .await
318        .context("failed to initialize queue")?;
319
320        let mut tasks = JoinSet::new();
321        data_watch(&mut tasks, self.data_store, host.clone())
322            .await
323            .context("failed to start data watch")?;
324
325        tasks.spawn({
326            let ctl_nats = Arc::clone(&self.ctl_nats);
327            let host = Arc::clone(&host);
328            let ctl_subject_prefix = Arc::new(self.ctl_topic_prefix.clone());
329            async move {
330                queue
331                    .for_each_concurrent(None, {
332                        let host = Arc::clone(&host);
333                        let ctl_nats = Arc::clone(&ctl_nats);
334                        let ctl_subject_prefix = Arc::clone(&ctl_subject_prefix);
335                        move |msg| {
336                            let host = Arc::clone(&host);
337                            let ctl_nats = Arc::clone(&ctl_nats);
338                            let ctl_subject_prefix = Arc::clone(&ctl_subject_prefix);
339                            async move {
340                                let msg_subject = msg.subject.clone();
341                                let msg_reply = msg.reply.clone();
342                                let payload = host.handle_ctl_message(msg, &ctl_subject_prefix).await;
343                                if let Some(reply) = msg_reply {
344                                    let headers = injector_to_headers(&TraceContextInjector::default_with_span());
345                                    if let Some(payload) = payload {
346                                        let max_payload = ctl_nats.server_info().max_payload;
347                                        if payload.len() > max_payload {
348                                            warn!(
349                                                size = payload.len(),
350                                                max_size = max_payload,
351                                                "ctl response payload is too large to publish and may fail",
352                                            );
353                                        }
354                                        if let Err(err) =
355                                            ctl_nats
356                                            .publish_with_headers(reply.clone(), headers, payload)
357                                            .err_into::<anyhow::Error>()
358                                            .and_then(|()| ctl_nats.flush().err_into::<anyhow::Error>())
359                                            .await
360                                        {
361                                            tracing::error!(%msg_subject, ?err, "failed to publish reply to control interface request");
362                                        }
363                                    }
364                                }
365                            }
366                        }
367                    })
368                    .await;
369
370                let deadline = { *host.stop_rx.borrow() };
371                host.stop_tx.send_replace(deadline);
372                Ok(())
373            }
374        });
375
376        Ok(tasks)
377    }
378}
379
380/// Helper function to serialize `CtlResponse`<T> into a Vec<u8> if the response is Some
381fn serialize_ctl_response<T: Serialize>(
382    ctl_response: Option<CtlResponse<T>>,
383) -> Option<anyhow::Result<Vec<u8>>> {
384    ctl_response.map(|resp| serde_json::to_vec(&resp).map_err(anyhow::Error::from))
385}