wasmcloud_host/wasmbus/
ctl.rs

1use core::sync::atomic::Ordering;
2
3use std::collections::btree_map::Entry as BTreeMapEntry;
4use std::collections::{hash_map, HashMap};
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::{anyhow, bail, Context as _};
9use bytes::Bytes;
10use futures::join;
11use serde_json::json;
12use tokio::spawn;
13use tokio::time::Instant;
14use tracing::{debug, error, info, instrument, trace, warn};
15use wasmcloud_control_interface::{
16    ComponentAuctionAck, ComponentAuctionRequest, CtlResponse,
17    DeleteInterfaceLinkDefinitionRequest, HostInventory, HostLabel, HostLabelIdentifier, Link,
18    ProviderAuctionAck, ProviderAuctionRequest, RegistryCredential, ScaleComponentCommand,
19    StartProviderCommand, StopHostCommand, StopProviderCommand, UpdateComponentCommand,
20};
21use wasmcloud_core::shutdown_subject;
22use wasmcloud_tracing::context::TraceContextInjector;
23
24use crate::registry::RegistryCredentialExt;
25use crate::wasmbus::{
26    human_friendly_uptime, injector_to_headers, Annotations, Claims, Host, Provider, StoredClaims,
27};
28use crate::ResourceRef;
29
30/// Implementation for the server-side handling of control interface requests.
31///
32/// This trait is not a part of the `wasmcloud_control_interface` crate yet to allow
33/// for the initial implementation to be done in the `wasmcloud_host` (pre 1.0) crate. This
34/// will likely move to that crate in the future.
35#[async_trait::async_trait]
36pub trait ControlInterfaceServer: Send + Sync {
37    /// Handle an auction request for a component. This method should return `Ok(None)` if the host
38    /// does not want to respond to the auction request.
39    async fn handle_auction_component(
40        &self,
41        request: ComponentAuctionRequest,
42    ) -> anyhow::Result<Option<CtlResponse<ComponentAuctionAck>>>;
43    /// Handle an auction request for a provider. This method should return `Ok(None)` if the host
44    /// does not want to respond to the auction request.
45    async fn handle_auction_provider(
46        &self,
47        request: ProviderAuctionRequest,
48    ) -> anyhow::Result<Option<CtlResponse<ProviderAuctionAck>>>;
49
50    /// Handle a request to stop the host. This method should return a response indicating success
51    /// or failure.
52    async fn handle_stop_host(&self, request: StopHostCommand) -> anyhow::Result<CtlResponse<()>>;
53
54    /// Handle a request to scale a component. This method should return a response indicating success
55    /// or failure.
56    async fn handle_scale_component(
57        self: Arc<Self>,
58        request: ScaleComponentCommand,
59    ) -> anyhow::Result<CtlResponse<()>>;
60
61    /// Handle a request to update a component. This method should return a response indicating success
62    /// or failure.
63    async fn handle_update_component(
64        self: Arc<Self>,
65        request: UpdateComponentCommand,
66    ) -> anyhow::Result<CtlResponse<()>>;
67
68    /// Handle a request to start a provider. This method should return a response indicating success
69    /// or failure.
70    async fn handle_start_provider(
71        self: Arc<Self>,
72        request: StartProviderCommand,
73    ) -> anyhow::Result<Option<CtlResponse<()>>>;
74
75    /// Handle a request to stop a provider. This method should return a response indicating success
76    /// or failure.
77    async fn handle_stop_provider(
78        &self,
79        request: StopProviderCommand,
80    ) -> anyhow::Result<CtlResponse<()>>;
81
82    /// Handle a request to get the host inventory. This method should return a response containing
83    /// the host inventory.
84    async fn handle_inventory(&self) -> anyhow::Result<CtlResponse<HostInventory>>;
85
86    /// Handle a request to get the claims for all components and providers. This method should return
87    /// a response containing the claims.
88    async fn handle_claims(&self) -> anyhow::Result<CtlResponse<Vec<HashMap<String, String>>>>;
89
90    /// Handle a request to get the links for all components. This method should return a response containing
91    /// the links.
92    async fn handle_links(&self) -> anyhow::Result<Vec<u8>>;
93
94    /// Handle a request to get the configuration for a specific key. This method should return a response
95    /// containing the configuration.
96    async fn handle_config_get(&self, config_name: &str) -> anyhow::Result<Vec<u8>>;
97
98    /// Handle a request to delete the configuration for a specific key. This method should return a response
99    /// indicating success or failure.
100    async fn handle_config_delete(&self, config_name: &str) -> anyhow::Result<CtlResponse<()>>;
101
102    /// Handle a request to put a label on the host. This method should return a response indicating success
103    /// or failure.
104    async fn handle_label_put(
105        &self,
106        request: HostLabel,
107        host_id: &str,
108    ) -> anyhow::Result<CtlResponse<()>>;
109
110    /// Handle a request to delete a label from the host. This method should return a response indicating success
111    /// or failure.
112    async fn handle_label_del(
113        &self,
114        request: HostLabelIdentifier,
115        host_id: &str,
116    ) -> anyhow::Result<CtlResponse<()>>;
117
118    /// Handle a request to put a link on a component. This method should return a response indicating success
119    /// or failure.
120    async fn handle_link_put(&self, request: Link) -> anyhow::Result<CtlResponse<()>>;
121
122    /// Handle a request to delete a link from a component. This method should return a response indicating success
123    /// or failure.
124    async fn handle_link_del(
125        &self,
126        request: DeleteInterfaceLinkDefinitionRequest,
127    ) -> anyhow::Result<CtlResponse<()>>;
128
129    /// Handle a request to put registry credentials. This method should return a response indicating success
130    /// or failure.
131    async fn handle_registries_put(
132        &self,
133        request: HashMap<String, RegistryCredential>,
134    ) -> anyhow::Result<CtlResponse<()>>;
135
136    /// Handle a request to put configuration data. This method should return a response indicating success
137    /// or failure.
138    async fn handle_config_put(
139        &self,
140        config_name: &str,
141        data: Bytes,
142    ) -> anyhow::Result<CtlResponse<()>>;
143
144    /// Handle a request to ping all hosts in the lattice. This method should return a response containing
145    /// the host data.
146    async fn handle_ping_hosts(
147        &self,
148    ) -> anyhow::Result<CtlResponse<wasmcloud_control_interface::Host>>;
149}
150
151#[async_trait::async_trait]
152impl ControlInterfaceServer for Host {
153    #[instrument(level = "debug", skip_all)]
154    async fn handle_auction_component(
155        &self,
156        request: ComponentAuctionRequest,
157    ) -> anyhow::Result<Option<CtlResponse<ComponentAuctionAck>>> {
158        let component_ref = request.component_ref();
159        let component_id = request.component_id();
160        let constraints = request.constraints();
161
162        info!(
163            component_ref,
164            component_id,
165            ?constraints,
166            "handling auction for component"
167        );
168
169        let host_labels = self.labels.read().await;
170        let constraints_satisfied = constraints
171            .iter()
172            .all(|(k, v)| host_labels.get(k).is_some_and(|hv| hv == v));
173        let component_id_running = self.components.read().await.contains_key(component_id);
174
175        // This host can run the component if all constraints are satisfied and the component is not already running
176        if constraints_satisfied && !component_id_running {
177            Ok(Some(CtlResponse::ok(
178                ComponentAuctionAck::from_component_host_and_constraints(
179                    component_ref,
180                    component_id,
181                    &self.host_key.public_key(),
182                    constraints.clone(),
183                ),
184            )))
185        } else {
186            Ok(None)
187        }
188    }
189
190    #[instrument(level = "debug", skip_all)]
191    async fn handle_auction_provider(
192        &self,
193        request: ProviderAuctionRequest,
194    ) -> anyhow::Result<Option<CtlResponse<ProviderAuctionAck>>> {
195        let provider_ref = request.provider_ref();
196        let provider_id = request.provider_id();
197        let constraints = request.constraints();
198
199        info!(
200            provider_ref,
201            provider_id,
202            ?constraints,
203            "handling auction for provider"
204        );
205
206        let host_labels = self.labels.read().await;
207        let constraints_satisfied = constraints
208            .iter()
209            .all(|(k, v)| host_labels.get(k).is_some_and(|hv| hv == v));
210        let providers = self.providers.read().await;
211        let provider_running = providers.contains_key(provider_id);
212        if constraints_satisfied && !provider_running {
213            Ok(Some(CtlResponse::ok(
214                ProviderAuctionAck::builder()
215                    .provider_ref(provider_ref.into())
216                    .provider_id(provider_id.into())
217                    .constraints(constraints.clone())
218                    .host_id(self.host_key.public_key())
219                    .build()
220                    .map_err(|e| anyhow!("failed to build provider auction ack: {e}"))?,
221            )))
222        } else {
223            Ok(None)
224        }
225    }
226
227    #[instrument(level = "debug", skip_all)]
228    async fn handle_stop_host(&self, request: StopHostCommand) -> anyhow::Result<CtlResponse<()>> {
229        let timeout = request.timeout();
230
231        info!(?timeout, "handling stop host");
232
233        self.ready.store(false, Ordering::Relaxed);
234        self.heartbeat.abort();
235        let deadline =
236            timeout.and_then(|timeout| Instant::now().checked_add(Duration::from_millis(timeout)));
237        self.stop_tx.send_replace(deadline);
238
239        Ok(CtlResponse::<()>::success(
240            "successfully handled stop host".into(),
241        ))
242    }
243    #[instrument(level = "debug", skip_all)]
244    async fn handle_scale_component(
245        self: Arc<Self>,
246        request: ScaleComponentCommand,
247    ) -> anyhow::Result<CtlResponse<()>> {
248        let component_ref = request.component_ref();
249        let component_id = request.component_id();
250        let annotations = request.annotations();
251        let max_instances = request.max_instances();
252        let component_limits = request.component_limits();
253        let config = request.config().clone();
254        let allow_update = request.allow_update();
255        let host_id = request.host_id();
256
257        debug!(
258            component_ref,
259            max_instances, component_id, "handling scale component"
260        );
261
262        let host_id = host_id.to_string();
263        let annotations: Annotations = annotations
264            .cloned()
265            .unwrap_or_default()
266            .into_iter()
267            .collect();
268
269        // Basic validation to ensure that the component is running and that the image reference matches
270        // If it doesn't match, we can still successfully scale, but we won't be updating the image reference
271        let (original_ref, ref_changed) = {
272            self.components
273                .read()
274                .await
275                .get(component_id)
276                .map(|v| {
277                    (
278                        Some(Arc::clone(&v.image_reference)),
279                        &*v.image_reference != component_ref,
280                    )
281                })
282                .unwrap_or_else(|| (None, false))
283        };
284
285        let mut perform_post_update: bool = false;
286        let message = match (allow_update, original_ref, ref_changed) {
287            // Updates are not allowed, original ref changed
288            (false, Some(original_ref), true) => {
289                let msg = format!(
290                "Requested to scale existing component to a different image reference: {original_ref} != {component_ref}. The component will be scaled but the image reference will not be updated. If you meant to update this component to a new image ref, use the update command."
291            );
292                warn!(msg);
293                msg
294            }
295            // Updates are allowed, ref changed and we'll do an update later
296            (true, Some(original_ref), true) => {
297                perform_post_update = true;
298                format!(
299                "Requested to scale existing component, with a changed image reference: {original_ref} != {component_ref}. The component will be scaled, and the image reference will be updated afterwards."
300            )
301            }
302            _ => String::with_capacity(0),
303        };
304
305        let component_id = Arc::from(component_id);
306        let component_ref = Arc::from(component_ref);
307        // Spawn a task to perform the scaling and possibly an update of the component afterwards
308        spawn(async move {
309            // Fetch the component from the reference
310            let component_and_claims =
311                self.fetch_component(&component_ref)
312                    .await
313                    .map(|component_bytes| {
314                        // Pull the claims token from the component, this returns an error only if claims are embedded
315                        // and they are invalid (expired, tampered with, etc)
316                        let claims_token =
317                            wasmcloud_runtime::component::claims_token(&component_bytes);
318                        (component_bytes, claims_token)
319                    });
320            let (wasm, claims_token, retrieval_error) = match component_and_claims {
321                Ok((wasm, Ok(claims_token))) => (Some(wasm), claims_token, None),
322                Ok((_, Err(e))) => {
323                    if let Err(e) = self
324                        .event_publisher
325                        .publish_event(
326                            "component_scale_failed",
327                            crate::event::component_scale_failed(
328                                None,
329                                &annotations,
330                                host_id,
331                                &component_ref,
332                                &component_id,
333                                max_instances,
334                                &e,
335                            ),
336                        )
337                        .await
338                    {
339                        error!(%component_ref, %component_id, err = ?e, "failed to publish component scale failed event");
340                    }
341                    return;
342                }
343                Err(e) => (None, None, Some(e)),
344            };
345            // Scale the component
346            if let Err(e) = self
347                .handle_scale_component_task(
348                    Arc::clone(&component_ref),
349                    Arc::clone(&component_id),
350                    &host_id,
351                    max_instances,
352                    component_limits,
353                    &annotations,
354                    config,
355                    wasm.ok_or_else(|| {
356                        retrieval_error.unwrap_or_else(|| anyhow!("unexpected missing wasm binary"))
357                    }),
358                    claims_token.as_ref(),
359                )
360                .await
361            {
362                error!(%component_ref, %component_id, err = ?e, "failed to scale component");
363                if let Err(e) = self
364                    .event_publisher
365                    .publish_event(
366                        "component_scale_failed",
367                        crate::event::component_scale_failed(
368                            claims_token.map(|c| c.claims).as_ref(),
369                            &annotations,
370                            host_id,
371                            &component_ref,
372                            &component_id,
373                            max_instances,
374                            &e,
375                        ),
376                    )
377                    .await
378                {
379                    error!(%component_ref, %component_id, err = ?e, "failed to publish component scale failed event");
380                }
381                return;
382            }
383
384            if perform_post_update {
385                if let Err(e) = self
386                    .handle_update_component_task(
387                        Arc::clone(&component_id),
388                        Arc::clone(&component_ref),
389                        &host_id,
390                        None,
391                    )
392                    .await
393                {
394                    error!(%component_ref, %component_id, err = ?e, "failed to update component after scale");
395                }
396            }
397        });
398
399        Ok(CtlResponse::<()>::success(message))
400    }
401
402    // TODO(#1548): With component IDs, new component references, configuration, etc, we're going to need to do some
403    // design thinking around how update component should work. Should it be limited to a single host or latticewide?
404    // Should it also update configuration, or is that separate? Should scaling be done via an update?
405    #[instrument(level = "debug", skip_all)]
406    async fn handle_update_component(
407        self: Arc<Self>,
408        request: UpdateComponentCommand,
409    ) -> anyhow::Result<CtlResponse<()>> {
410        let component_id = request.component_id();
411        let annotations = request.annotations().cloned();
412        let new_component_ref = request.new_component_ref();
413        let host_id = request.host_id();
414
415        debug!(
416            component_id,
417            new_component_ref,
418            ?annotations,
419            "handling update component"
420        );
421
422        // Find the component and extract the image reference
423        #[allow(clippy::map_clone)]
424        // NOTE: clippy thinks, that we can just replace the `.map` below by
425        // `.cloned` - we can't, because we need to clone the field
426        let Some(component_ref) = self
427            .components
428            .read()
429            .await
430            .get(component_id)
431            .map(|component| Arc::clone(&component.image_reference))
432        else {
433            return Ok(CtlResponse::error(&format!(
434                "component {component_id} not found"
435            )));
436        };
437
438        // If the component image reference is the same, respond with an appropriate message
439        if &*component_ref == new_component_ref {
440            return Ok(CtlResponse::<()>::success(format!(
441                "component {component_id} already updated to {new_component_ref}"
442            )));
443        }
444
445        let host_id = host_id.to_string();
446        let message = format!(
447            "component {component_id} updating from {component_ref} to {new_component_ref}"
448        );
449        let component_id = Arc::from(component_id);
450        let new_component_ref = Arc::from(new_component_ref);
451        spawn(async move {
452            if let Err(e) = self
453                .handle_update_component_task(
454                    Arc::clone(&component_id),
455                    Arc::clone(&new_component_ref),
456                    &host_id,
457                    annotations,
458                )
459                .await
460            {
461                error!(%new_component_ref, %component_id, err = ?e, "failed to update component");
462            }
463        });
464
465        Ok(CtlResponse::<()>::success(message))
466    }
467
468    #[instrument(level = "debug", skip_all)]
469    async fn handle_start_provider(
470        self: Arc<Self>,
471        request: StartProviderCommand,
472    ) -> anyhow::Result<Option<CtlResponse<()>>> {
473        if self
474            .providers
475            .read()
476            .await
477            .contains_key(request.provider_id())
478        {
479            return Ok(Some(CtlResponse::error(
480                "provider with that ID is already running",
481            )));
482        }
483
484        // Avoid responding to start providers for builtin providers if they're not enabled
485        if let Ok(ResourceRef::Builtin(name)) = ResourceRef::try_from(request.provider_ref()) {
486            if !self.experimental_features.builtin_http_server && name == "http-server" {
487                debug!(
488                    provider_ref = request.provider_ref(),
489                    provider_id = request.provider_id(),
490                    "skipping start provider for disabled builtin http provider"
491                );
492                return Ok(None);
493            }
494            if !self.experimental_features.builtin_messaging_nats && name == "messaging-nats" {
495                debug!(
496                    provider_ref = request.provider_ref(),
497                    provider_id = request.provider_id(),
498                    "skipping start provider for disabled builtin messaging provider"
499                );
500                return Ok(None);
501            }
502        }
503
504        // NOTE: We log at info since starting providers can take a while
505        info!(
506            provider_ref = request.provider_ref(),
507            provider_id = request.provider_id(),
508            "handling start provider"
509        );
510
511        let host_id = request.host_id().to_string();
512        spawn(async move {
513            let config = request.config();
514            let provider_id = request.provider_id();
515            let provider_ref = request.provider_ref();
516            let annotations = request.annotations();
517
518            if let Err(err) = Arc::clone(&self)
519                .handle_start_provider_task(
520                    config,
521                    provider_id,
522                    provider_ref,
523                    annotations.cloned().unwrap_or_default(),
524                    &host_id,
525                )
526                .await
527            {
528                error!(provider_ref, provider_id, ?err, "failed to start provider");
529                if let Err(err) = self
530                    .event_publisher
531                    .publish_event(
532                        "provider_start_failed",
533                        crate::event::provider_start_failed(
534                            provider_ref,
535                            provider_id,
536                            host_id,
537                            &err,
538                        ),
539                    )
540                    .await
541                {
542                    error!(?err, "failed to publish provider_start_failed event");
543                }
544            }
545        });
546
547        Ok(Some(CtlResponse::<()>::success(
548            "successfully started provider".into(),
549        )))
550    }
551
552    #[instrument(level = "debug", skip_all)]
553    async fn handle_stop_provider(
554        &self,
555        request: StopProviderCommand,
556    ) -> anyhow::Result<CtlResponse<()>> {
557        let provider_id = request.provider_id();
558        let host_id = request.host_id();
559
560        debug!(provider_id, "handling stop provider");
561
562        let mut providers = self.providers.write().await;
563        let hash_map::Entry::Occupied(entry) = providers.entry(provider_id.into()) else {
564            warn!(
565                provider_id,
566                "received request to stop provider that is not running"
567            );
568            return Ok(CtlResponse::error("provider with that ID is not running"));
569        };
570        let Provider {
571            ref annotations,
572            mut tasks,
573            shutdown,
574            ..
575        } = entry.remove();
576
577        // Set the shutdown flag to true to stop health checks and config updates. Also
578        // prevents restarting the provider but does not stop the provider process.
579        shutdown.store(true, Ordering::Relaxed);
580
581        // Send a request to the provider, requesting a graceful shutdown
582        let req = serde_json::to_vec(&json!({ "host_id": host_id }))
583            .context("failed to encode provider stop request")?;
584        let req = async_nats::Request::new()
585            .payload(req.into())
586            .timeout(self.host_config.provider_shutdown_delay)
587            .headers(injector_to_headers(
588                &TraceContextInjector::default_with_span(),
589            ));
590        if let Err(e) = self
591            .rpc_nats
592            .send_request(
593                shutdown_subject(&self.host_config.lattice, provider_id, "default"),
594                req,
595            )
596            .await
597        {
598            warn!(
599                ?e,
600                provider_id,
601                "provider did not gracefully shut down in time, shutting down forcefully"
602            );
603            // NOTE: The provider child process is spawned with [tokio::process::Command::kill_on_drop],
604            // so dropping the task will send a SIGKILL to the provider process.
605        }
606
607        // Stop the provider and health check / config changes tasks
608        tasks.abort_all();
609
610        info!(provider_id, "provider stopped");
611        self.event_publisher
612            .publish_event(
613                "provider_stopped",
614                crate::event::provider_stopped(annotations, host_id, provider_id, "stop"),
615            )
616            .await?;
617        Ok(CtlResponse::<()>::success(
618            "successfully stopped provider".into(),
619        ))
620    }
621
622    #[instrument(level = "debug", skip_all)]
623    async fn handle_inventory(&self) -> anyhow::Result<CtlResponse<HostInventory>> {
624        trace!("handling inventory");
625        let inventory = self.inventory().await;
626        Ok(CtlResponse::ok(inventory))
627    }
628
629    #[instrument(level = "trace", skip_all)]
630    async fn handle_claims(&self) -> anyhow::Result<CtlResponse<Vec<HashMap<String, String>>>> {
631        trace!("handling claims");
632
633        let (component_claims, provider_claims) =
634            join!(self.component_claims.read(), self.provider_claims.read());
635        let component_claims = component_claims.values().cloned().map(Claims::Component);
636        let provider_claims = provider_claims.values().cloned().map(Claims::Provider);
637        let claims: Vec<StoredClaims> = component_claims
638            .chain(provider_claims)
639            .flat_map(TryFrom::try_from)
640            .collect();
641
642        Ok(CtlResponse::ok(
643            claims.into_iter().map(std::convert::Into::into).collect(),
644        ))
645    }
646
647    #[instrument(level = "trace", skip_all)]
648    // TODO: Vec<&Link> return?
649    async fn handle_links(&self) -> anyhow::Result<Vec<u8>> {
650        trace!("handling links");
651
652        let links = self.links.read().await;
653        let links: Vec<&Link> = links.values().flatten().collect();
654        let res =
655            serde_json::to_vec(&CtlResponse::ok(links)).context("failed to serialize response")?;
656        Ok(res)
657    }
658
659    #[instrument(level = "trace", skip(self))]
660    async fn handle_config_get(&self, config_name: &str) -> anyhow::Result<Vec<u8>> {
661        trace!(%config_name, "handling get config");
662        if let Some(config_bytes) = self.config_store.get(config_name).await? {
663            let config_map: HashMap<String, String> = serde_json::from_slice(&config_bytes)
664                .context("config data should be a map of string -> string")?;
665            serde_json::to_vec(&CtlResponse::ok(config_map)).map_err(anyhow::Error::from)
666        } else {
667            serde_json::to_vec(&CtlResponse::<()>::success(
668                "Configuration not found".into(),
669            ))
670            .map_err(anyhow::Error::from)
671        }
672    }
673
674    #[instrument(level = "debug", skip_all, fields(%config_name))]
675    async fn handle_config_delete(&self, config_name: &str) -> anyhow::Result<CtlResponse<()>> {
676        debug!("handle config entry deletion");
677
678        self.config_store
679            .del(config_name)
680            .await
681            .context("Unable to delete config data")?;
682
683        self.event_publisher
684            .publish_event("config_deleted", crate::event::config_deleted(config_name))
685            .await?;
686
687        Ok(CtlResponse::<()>::success(
688            "successfully deleted config".into(),
689        ))
690    }
691
692    #[instrument(level = "debug", skip_all)]
693    async fn handle_label_put(
694        &self,
695        request: HostLabel,
696        host_id: &str,
697    ) -> anyhow::Result<CtlResponse<()>> {
698        let key = request.key();
699        if key.to_lowercase().starts_with("hostcore.") {
700            bail!("hostcore.* labels cannot be set dynamically");
701        }
702
703        let value = request.value();
704        let mut labels = self.labels.write().await;
705        match labels.entry(key.into()) {
706            BTreeMapEntry::Occupied(mut entry) => {
707                info!(key = entry.key(), value, "updated label");
708                entry.insert(value.into());
709            }
710            BTreeMapEntry::Vacant(entry) => {
711                info!(key = entry.key(), value, "set label");
712                entry.insert(value.into());
713            }
714        }
715
716        self.event_publisher
717            .publish_event(
718                "labels_changed",
719                crate::event::labels_changed(host_id, HashMap::from_iter(labels.clone())),
720            )
721            .await
722            .context("failed to publish labels_changed event")?;
723
724        Ok(CtlResponse::<()>::success("successfully put label".into()))
725    }
726
727    #[instrument(level = "debug", skip_all)]
728    async fn handle_label_del(
729        &self,
730        request: HostLabelIdentifier,
731        host_id: &str,
732    ) -> anyhow::Result<CtlResponse<()>> {
733        let key = request.key();
734        let mut labels = self.labels.write().await;
735        let value = labels.remove(key);
736
737        if value.is_none() {
738            warn!(key, "could not remove unset label");
739            return Ok(CtlResponse::<()>::success(
740                "successfully deleted label (no such label)".into(),
741            ));
742        };
743
744        info!(key, "removed label");
745        self.event_publisher
746            .publish_event(
747                "labels_changed",
748                crate::event::labels_changed(host_id, HashMap::from_iter(labels.clone())),
749            )
750            .await
751            .context("failed to publish labels_changed event")?;
752
753        Ok(CtlResponse::<()>::success(
754            "successfully deleted label".into(),
755        ))
756    }
757
758    /// Handle a new link by modifying the relevant source [crate::wasmbus::ComponentSpecification].
759    #[instrument(level = "debug", skip_all)]
760    async fn handle_link_put(&self, request: Link) -> anyhow::Result<CtlResponse<()>> {
761        let link_set_result: anyhow::Result<()> = async {
762            let source_id = request.source_id();
763            let target = request.target();
764            let wit_namespace = request.wit_namespace();
765            let wit_package = request.wit_package();
766            let interfaces = request.interfaces();
767            let name = request.name();
768
769            let ns_and_package = format!("{wit_namespace}:{wit_package}");
770            debug!(
771                source_id,
772                target,
773                ns_and_package,
774                name,
775                ?interfaces,
776                "handling put wrpc link definition"
777            );
778
779            // Validate all configurations
780            self.validate_config(
781                request
782                    .source_config()
783                    .clone()
784                    .iter()
785                    .chain(request.target_config())
786            ).await?;
787
788            let mut component_spec = self
789                .get_component_spec(source_id)
790                .await?
791                .unwrap_or_default();
792
793            // If the link is defined from this source on the same interface and link name, but to a different target,
794            // we need to reject this link and suggest deleting the existing link or using a different link name.
795            if let Some(existing_conflict_link) = component_spec.links.iter().find(|link| {
796                link.source_id() == source_id
797                    && link.wit_namespace() == wit_namespace
798                    && link.wit_package() == wit_package
799                    && link.name() == name
800                    // Check if interfaces have no intersection
801                    && link.interfaces().iter().any(|i| interfaces.contains(i))
802                    && link.target() != target
803            }) {
804                error!(
805                    source_id,
806                    desired_target = target,
807                    existing_target = existing_conflict_link.target(),
808                    ns_and_package,
809                    name,
810                    "link already exists with different target, consider deleting the existing link or using a different link name"
811                );
812                bail!("link already exists with different target, consider deleting the existing link or using a different link name");
813            }
814
815            // If we can find an existing link with the same source, target, namespace, package, and name, update it.
816            // Otherwise, add the new link to the component specification.
817            if let Some(existing_link_index) = component_spec.links.iter().position(|link| {
818                link.source_id() == source_id
819                    && link.target() == target
820                    && link.wit_namespace() == wit_namespace
821                    && link.wit_package() == wit_package
822                    && link.name() == name
823            }) {
824                if let Some(existing_link) = component_spec.links.get_mut(existing_link_index) {
825                    *existing_link = request.clone();
826                }
827            } else {
828                component_spec.links.push(request.clone());
829            };
830
831            // Update component specification with the new link
832            self.store_component_spec(&source_id, &component_spec)
833                .await?;
834            // NOTE: We rely on the NATS watcher to call update_host_with_spec asynchronously.
835            // Previously, we called it directly here, but this caused a race condition where
836            // self.links would be updated before the NATS watcher processed the change,
837            // preventing provider links from being sent on re-add scenarios.
838            // debug!(source_id, "calling update_host_with_spec directly from handle_link_put");
839            // self.update_host_with_spec(&source_id, &component_spec)
840            //     .await?;
841
842            self.put_backwards_compat_provider_link(&request)
843                .await?;
844
845            Ok(())
846        }
847        .await;
848
849        if let Err(e) = link_set_result {
850            self.event_publisher
851                .publish_event(
852                    "linkdef_set_failed",
853                    crate::event::linkdef_set_failed(&request, &e),
854                )
855                .await?;
856            Ok(CtlResponse::error(e.to_string().as_ref()))
857        } else {
858            self.event_publisher
859                .publish_event("linkdef_set", crate::event::linkdef_set(&request))
860                .await?;
861            Ok(CtlResponse::<()>::success("successfully set link".into()))
862        }
863    }
864
865    #[instrument(level = "debug", skip_all)]
866    /// Remove an interface link on a source component for a specific package
867    async fn handle_link_del(
868        &self,
869        request: DeleteInterfaceLinkDefinitionRequest,
870    ) -> anyhow::Result<CtlResponse<()>> {
871        let source_id = request.source_id();
872        let wit_namespace = request.wit_namespace();
873        let wit_package = request.wit_package();
874        let link_name = request.link_name();
875
876        let ns_and_package = format!("{wit_namespace}:{wit_package}");
877
878        debug!(
879            source_id,
880            ns_and_package, link_name, "handling del wrpc link definition"
881        );
882
883        let Some(mut component_spec) = self.get_component_spec(source_id).await? else {
884            // If the component spec doesn't exist, the link is deleted
885            return Ok(CtlResponse::<()>::success(
886                "successfully deleted link (spec doesn't exist)".into(),
887            ));
888        };
889
890        // If we can find an existing link with the same source, namespace, package, and name, remove it
891        // and update the component specification.
892        let deleted_link = if let Some(existing_link_index) =
893            component_spec.links.iter().position(|link| {
894                link.source_id() == source_id
895                    && link.wit_namespace() == wit_namespace
896                    && link.wit_package() == wit_package
897                    && link.name() == link_name
898            }) {
899            // Sanity safety check since `swap_remove` will panic if the index is out of bounds
900            if existing_link_index < component_spec.links.len() {
901                Some(component_spec.links.swap_remove(existing_link_index))
902            } else {
903                None
904            }
905        } else {
906            None
907        };
908
909        if let Some(link) = deleted_link.as_ref() {
910            // Update component specification with the deleted link
911            self.store_component_spec(&source_id, &component_spec)
912                .await?;
913            self.update_host_with_spec(&source_id, &component_spec)
914                .await?;
915
916            // Send the link to providers for deletion
917            self.del_provider_link(link).await?;
918        }
919
920        // For idempotency, we always publish the deleted event, even if the link didn't exist
921        let deleted_link_target = deleted_link
922            .as_ref()
923            .map(|link| String::from(link.target()));
924        self.event_publisher
925            .publish_event(
926                "linkdef_deleted",
927                crate::event::linkdef_deleted(
928                    source_id,
929                    deleted_link_target.as_ref(),
930                    link_name,
931                    wit_namespace,
932                    wit_package,
933                    deleted_link.as_ref().map(|link| link.interfaces()),
934                ),
935            )
936            .await?;
937
938        Ok(CtlResponse::<()>::success(
939            "successfully deleted link".into(),
940        ))
941    }
942
943    #[instrument(level = "debug", skip_all)]
944    async fn handle_registries_put(
945        &self,
946        request: HashMap<String, RegistryCredential>,
947    ) -> anyhow::Result<CtlResponse<()>> {
948        info!(
949            registries = ?request.keys(),
950            "updating registry config",
951        );
952
953        let mut registry_config = self.registry_config.write().await;
954        for (reg, new_creds) in request {
955            let mut new_config = new_creds.into_registry_config()?;
956            match registry_config.entry(reg) {
957                hash_map::Entry::Occupied(mut entry) => {
958                    entry.get_mut().set_auth(new_config.auth().clone());
959                }
960                hash_map::Entry::Vacant(entry) => {
961                    new_config.set_allow_latest(self.host_config.oci_opts.allow_latest);
962                    entry.insert(new_config);
963                }
964            }
965        }
966
967        Ok(CtlResponse::<()>::success(
968            "successfully put registries".into(),
969        ))
970    }
971
972    #[instrument(level = "debug", skip_all, fields(%config_name))]
973    async fn handle_config_put(
974        &self,
975        config_name: &str,
976        data: Bytes,
977    ) -> anyhow::Result<CtlResponse<()>> {
978        debug!("handle config entry put");
979        // Validate that the data is of the proper type by deserialing it
980        serde_json::from_slice::<HashMap<String, String>>(&data)
981            .context("config data should be a map of string -> string")?;
982        self.config_store
983            .put(config_name, data)
984            .await
985            .context("unable to store config data")?;
986        // We don't write it into the cached data and instead let the caching thread handle it as we
987        // won't need it immediately.
988        self.event_publisher
989            .publish_event("config_set", crate::event::config_set(config_name))
990            .await?;
991
992        Ok(CtlResponse::<()>::success("successfully put config".into()))
993    }
994
995    #[instrument(level = "debug", skip_all)]
996    async fn handle_ping_hosts(
997        &self,
998    ) -> anyhow::Result<CtlResponse<wasmcloud_control_interface::Host>> {
999        trace!("replying to ping");
1000        let uptime = self.start_at.elapsed();
1001
1002        let mut host = wasmcloud_control_interface::Host::builder()
1003            .id(self.host_key.public_key())
1004            .labels(self.labels.read().await.clone())
1005            .friendly_name(self.friendly_name.clone())
1006            .uptime_seconds(uptime.as_secs())
1007            .uptime_human(human_friendly_uptime(uptime))
1008            .version(self.host_config.version.clone())
1009            .ctl_host(self.host_config.rpc_nats_url.to_string())
1010            .rpc_host(self.host_config.rpc_nats_url.to_string())
1011            .lattice(self.host_config.lattice.to_string());
1012
1013        if let Some(ref js_domain) = self.host_config.js_domain {
1014            host = host.js_domain(js_domain.clone());
1015        }
1016
1017        let host = host
1018            .build()
1019            .map_err(|e| anyhow!("failed to build host message: {e}"))?;
1020
1021        Ok(CtlResponse::ok(host))
1022    }
1023}