1use core::sync::atomic::Ordering;
2
3use std::collections::btree_map::Entry as BTreeMapEntry;
4use std::collections::{hash_map, HashMap};
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::{anyhow, bail, Context as _};
9use bytes::Bytes;
10use futures::join;
11use serde_json::json;
12use tokio::spawn;
13use tokio::time::Instant;
14use tracing::{debug, error, info, instrument, trace, warn};
15use wasmcloud_control_interface::{
16 ComponentAuctionAck, ComponentAuctionRequest, CtlResponse,
17 DeleteInterfaceLinkDefinitionRequest, HostInventory, HostLabel, HostLabelIdentifier, Link,
18 ProviderAuctionAck, ProviderAuctionRequest, RegistryCredential, ScaleComponentCommand,
19 StartProviderCommand, StopHostCommand, StopProviderCommand, UpdateComponentCommand,
20};
21use wasmcloud_core::shutdown_subject;
22use wasmcloud_tracing::context::TraceContextInjector;
23
24use crate::registry::RegistryCredentialExt;
25use crate::wasmbus::{
26 human_friendly_uptime, injector_to_headers, Annotations, Claims, Host, Provider, StoredClaims,
27};
28use crate::ResourceRef;
29
30#[async_trait::async_trait]
36pub trait ControlInterfaceServer: Send + Sync {
37 async fn handle_auction_component(
40 &self,
41 request: ComponentAuctionRequest,
42 ) -> anyhow::Result<Option<CtlResponse<ComponentAuctionAck>>>;
43 async fn handle_auction_provider(
46 &self,
47 request: ProviderAuctionRequest,
48 ) -> anyhow::Result<Option<CtlResponse<ProviderAuctionAck>>>;
49
50 async fn handle_stop_host(&self, request: StopHostCommand) -> anyhow::Result<CtlResponse<()>>;
53
54 async fn handle_scale_component(
57 self: Arc<Self>,
58 request: ScaleComponentCommand,
59 ) -> anyhow::Result<CtlResponse<()>>;
60
61 async fn handle_update_component(
64 self: Arc<Self>,
65 request: UpdateComponentCommand,
66 ) -> anyhow::Result<CtlResponse<()>>;
67
68 async fn handle_start_provider(
71 self: Arc<Self>,
72 request: StartProviderCommand,
73 ) -> anyhow::Result<Option<CtlResponse<()>>>;
74
75 async fn handle_stop_provider(
78 &self,
79 request: StopProviderCommand,
80 ) -> anyhow::Result<CtlResponse<()>>;
81
82 async fn handle_inventory(&self) -> anyhow::Result<CtlResponse<HostInventory>>;
85
86 async fn handle_claims(&self) -> anyhow::Result<CtlResponse<Vec<HashMap<String, String>>>>;
89
90 async fn handle_links(&self) -> anyhow::Result<Vec<u8>>;
93
94 async fn handle_config_get(&self, config_name: &str) -> anyhow::Result<Vec<u8>>;
97
98 async fn handle_config_delete(&self, config_name: &str) -> anyhow::Result<CtlResponse<()>>;
101
102 async fn handle_label_put(
105 &self,
106 request: HostLabel,
107 host_id: &str,
108 ) -> anyhow::Result<CtlResponse<()>>;
109
110 async fn handle_label_del(
113 &self,
114 request: HostLabelIdentifier,
115 host_id: &str,
116 ) -> anyhow::Result<CtlResponse<()>>;
117
118 async fn handle_link_put(&self, request: Link) -> anyhow::Result<CtlResponse<()>>;
121
122 async fn handle_link_del(
125 &self,
126 request: DeleteInterfaceLinkDefinitionRequest,
127 ) -> anyhow::Result<CtlResponse<()>>;
128
129 async fn handle_registries_put(
132 &self,
133 request: HashMap<String, RegistryCredential>,
134 ) -> anyhow::Result<CtlResponse<()>>;
135
136 async fn handle_config_put(
139 &self,
140 config_name: &str,
141 data: Bytes,
142 ) -> anyhow::Result<CtlResponse<()>>;
143
144 async fn handle_ping_hosts(
147 &self,
148 ) -> anyhow::Result<CtlResponse<wasmcloud_control_interface::Host>>;
149}
150
151#[async_trait::async_trait]
152impl ControlInterfaceServer for Host {
153 #[instrument(level = "debug", skip_all)]
154 async fn handle_auction_component(
155 &self,
156 request: ComponentAuctionRequest,
157 ) -> anyhow::Result<Option<CtlResponse<ComponentAuctionAck>>> {
158 let component_ref = request.component_ref();
159 let component_id = request.component_id();
160 let constraints = request.constraints();
161
162 info!(
163 component_ref,
164 component_id,
165 ?constraints,
166 "handling auction for component"
167 );
168
169 let host_labels = self.labels.read().await;
170 let constraints_satisfied = constraints
171 .iter()
172 .all(|(k, v)| host_labels.get(k).is_some_and(|hv| hv == v));
173 let component_id_running = self.components.read().await.contains_key(component_id);
174
175 if constraints_satisfied && !component_id_running {
177 Ok(Some(CtlResponse::ok(
178 ComponentAuctionAck::from_component_host_and_constraints(
179 component_ref,
180 component_id,
181 &self.host_key.public_key(),
182 constraints.clone(),
183 ),
184 )))
185 } else {
186 Ok(None)
187 }
188 }
189
190 #[instrument(level = "debug", skip_all)]
191 async fn handle_auction_provider(
192 &self,
193 request: ProviderAuctionRequest,
194 ) -> anyhow::Result<Option<CtlResponse<ProviderAuctionAck>>> {
195 let provider_ref = request.provider_ref();
196 let provider_id = request.provider_id();
197 let constraints = request.constraints();
198
199 info!(
200 provider_ref,
201 provider_id,
202 ?constraints,
203 "handling auction for provider"
204 );
205
206 let host_labels = self.labels.read().await;
207 let constraints_satisfied = constraints
208 .iter()
209 .all(|(k, v)| host_labels.get(k).is_some_and(|hv| hv == v));
210 let providers = self.providers.read().await;
211 let provider_running = providers.contains_key(provider_id);
212 if constraints_satisfied && !provider_running {
213 Ok(Some(CtlResponse::ok(
214 ProviderAuctionAck::builder()
215 .provider_ref(provider_ref.into())
216 .provider_id(provider_id.into())
217 .constraints(constraints.clone())
218 .host_id(self.host_key.public_key())
219 .build()
220 .map_err(|e| anyhow!("failed to build provider auction ack: {e}"))?,
221 )))
222 } else {
223 Ok(None)
224 }
225 }
226
227 #[instrument(level = "debug", skip_all)]
228 async fn handle_stop_host(&self, request: StopHostCommand) -> anyhow::Result<CtlResponse<()>> {
229 let timeout = request.timeout();
230
231 info!(?timeout, "handling stop host");
232
233 self.ready.store(false, Ordering::Relaxed);
234 self.heartbeat.abort();
235 let deadline =
236 timeout.and_then(|timeout| Instant::now().checked_add(Duration::from_millis(timeout)));
237 self.stop_tx.send_replace(deadline);
238
239 Ok(CtlResponse::<()>::success(
240 "successfully handled stop host".into(),
241 ))
242 }
243 #[instrument(level = "debug", skip_all)]
244 async fn handle_scale_component(
245 self: Arc<Self>,
246 request: ScaleComponentCommand,
247 ) -> anyhow::Result<CtlResponse<()>> {
248 let component_ref = request.component_ref();
249 let component_id = request.component_id();
250 let annotations = request.annotations();
251 let max_instances = request.max_instances();
252 let component_limits = request.component_limits();
253 let config = request.config().clone();
254 let allow_update = request.allow_update();
255 let host_id = request.host_id();
256
257 debug!(
258 component_ref,
259 max_instances, component_id, "handling scale component"
260 );
261
262 let host_id = host_id.to_string();
263 let annotations: Annotations = annotations
264 .cloned()
265 .unwrap_or_default()
266 .into_iter()
267 .collect();
268
269 let (original_ref, ref_changed) = {
272 self.components
273 .read()
274 .await
275 .get(component_id)
276 .map(|v| {
277 (
278 Some(Arc::clone(&v.image_reference)),
279 &*v.image_reference != component_ref,
280 )
281 })
282 .unwrap_or_else(|| (None, false))
283 };
284
285 let mut perform_post_update: bool = false;
286 let message = match (allow_update, original_ref, ref_changed) {
287 (false, Some(original_ref), true) => {
289 let msg = format!(
290 "Requested to scale existing component to a different image reference: {original_ref} != {component_ref}. The component will be scaled but the image reference will not be updated. If you meant to update this component to a new image ref, use the update command."
291 );
292 warn!(msg);
293 msg
294 }
295 (true, Some(original_ref), true) => {
297 perform_post_update = true;
298 format!(
299 "Requested to scale existing component, with a changed image reference: {original_ref} != {component_ref}. The component will be scaled, and the image reference will be updated afterwards."
300 )
301 }
302 _ => String::with_capacity(0),
303 };
304
305 let component_id = Arc::from(component_id);
306 let component_ref = Arc::from(component_ref);
307 spawn(async move {
309 let component_and_claims =
311 self.fetch_component(&component_ref)
312 .await
313 .map(|component_bytes| {
314 let claims_token =
317 wasmcloud_runtime::component::claims_token(&component_bytes);
318 (component_bytes, claims_token)
319 });
320 let (wasm, claims_token, retrieval_error) = match component_and_claims {
321 Ok((wasm, Ok(claims_token))) => (Some(wasm), claims_token, None),
322 Ok((_, Err(e))) => {
323 if let Err(e) = self
324 .event_publisher
325 .publish_event(
326 "component_scale_failed",
327 crate::event::component_scale_failed(
328 None,
329 &annotations,
330 host_id,
331 &component_ref,
332 &component_id,
333 max_instances,
334 &e,
335 ),
336 )
337 .await
338 {
339 error!(%component_ref, %component_id, err = ?e, "failed to publish component scale failed event");
340 }
341 return;
342 }
343 Err(e) => (None, None, Some(e)),
344 };
345 if let Err(e) = self
347 .handle_scale_component_task(
348 Arc::clone(&component_ref),
349 Arc::clone(&component_id),
350 &host_id,
351 max_instances,
352 component_limits,
353 &annotations,
354 config,
355 wasm.ok_or_else(|| {
356 retrieval_error.unwrap_or_else(|| anyhow!("unexpected missing wasm binary"))
357 }),
358 claims_token.as_ref(),
359 )
360 .await
361 {
362 error!(%component_ref, %component_id, err = ?e, "failed to scale component");
363 if let Err(e) = self
364 .event_publisher
365 .publish_event(
366 "component_scale_failed",
367 crate::event::component_scale_failed(
368 claims_token.map(|c| c.claims).as_ref(),
369 &annotations,
370 host_id,
371 &component_ref,
372 &component_id,
373 max_instances,
374 &e,
375 ),
376 )
377 .await
378 {
379 error!(%component_ref, %component_id, err = ?e, "failed to publish component scale failed event");
380 }
381 return;
382 }
383
384 if perform_post_update {
385 if let Err(e) = self
386 .handle_update_component_task(
387 Arc::clone(&component_id),
388 Arc::clone(&component_ref),
389 &host_id,
390 None,
391 )
392 .await
393 {
394 error!(%component_ref, %component_id, err = ?e, "failed to update component after scale");
395 }
396 }
397 });
398
399 Ok(CtlResponse::<()>::success(message))
400 }
401
402 #[instrument(level = "debug", skip_all)]
406 async fn handle_update_component(
407 self: Arc<Self>,
408 request: UpdateComponentCommand,
409 ) -> anyhow::Result<CtlResponse<()>> {
410 let component_id = request.component_id();
411 let annotations = request.annotations().cloned();
412 let new_component_ref = request.new_component_ref();
413 let host_id = request.host_id();
414
415 debug!(
416 component_id,
417 new_component_ref,
418 ?annotations,
419 "handling update component"
420 );
421
422 #[allow(clippy::map_clone)]
424 let Some(component_ref) = self
427 .components
428 .read()
429 .await
430 .get(component_id)
431 .map(|component| Arc::clone(&component.image_reference))
432 else {
433 return Ok(CtlResponse::error(&format!(
434 "component {component_id} not found"
435 )));
436 };
437
438 if &*component_ref == new_component_ref {
440 return Ok(CtlResponse::<()>::success(format!(
441 "component {component_id} already updated to {new_component_ref}"
442 )));
443 }
444
445 let host_id = host_id.to_string();
446 let message = format!(
447 "component {component_id} updating from {component_ref} to {new_component_ref}"
448 );
449 let component_id = Arc::from(component_id);
450 let new_component_ref = Arc::from(new_component_ref);
451 spawn(async move {
452 if let Err(e) = self
453 .handle_update_component_task(
454 Arc::clone(&component_id),
455 Arc::clone(&new_component_ref),
456 &host_id,
457 annotations,
458 )
459 .await
460 {
461 error!(%new_component_ref, %component_id, err = ?e, "failed to update component");
462 }
463 });
464
465 Ok(CtlResponse::<()>::success(message))
466 }
467
468 #[instrument(level = "debug", skip_all)]
469 async fn handle_start_provider(
470 self: Arc<Self>,
471 request: StartProviderCommand,
472 ) -> anyhow::Result<Option<CtlResponse<()>>> {
473 if self
474 .providers
475 .read()
476 .await
477 .contains_key(request.provider_id())
478 {
479 return Ok(Some(CtlResponse::error(
480 "provider with that ID is already running",
481 )));
482 }
483
484 if let Ok(ResourceRef::Builtin(name)) = ResourceRef::try_from(request.provider_ref()) {
486 if !self.experimental_features.builtin_http_server && name == "http-server" {
487 debug!(
488 provider_ref = request.provider_ref(),
489 provider_id = request.provider_id(),
490 "skipping start provider for disabled builtin http provider"
491 );
492 return Ok(None);
493 }
494 if !self.experimental_features.builtin_messaging_nats && name == "messaging-nats" {
495 debug!(
496 provider_ref = request.provider_ref(),
497 provider_id = request.provider_id(),
498 "skipping start provider for disabled builtin messaging provider"
499 );
500 return Ok(None);
501 }
502 }
503
504 info!(
506 provider_ref = request.provider_ref(),
507 provider_id = request.provider_id(),
508 "handling start provider"
509 );
510
511 let host_id = request.host_id().to_string();
512 spawn(async move {
513 let config = request.config();
514 let provider_id = request.provider_id();
515 let provider_ref = request.provider_ref();
516 let annotations = request.annotations();
517
518 if let Err(err) = Arc::clone(&self)
519 .handle_start_provider_task(
520 config,
521 provider_id,
522 provider_ref,
523 annotations.cloned().unwrap_or_default(),
524 &host_id,
525 )
526 .await
527 {
528 error!(provider_ref, provider_id, ?err, "failed to start provider");
529 if let Err(err) = self
530 .event_publisher
531 .publish_event(
532 "provider_start_failed",
533 crate::event::provider_start_failed(
534 provider_ref,
535 provider_id,
536 host_id,
537 &err,
538 ),
539 )
540 .await
541 {
542 error!(?err, "failed to publish provider_start_failed event");
543 }
544 }
545 });
546
547 Ok(Some(CtlResponse::<()>::success(
548 "successfully started provider".into(),
549 )))
550 }
551
552 #[instrument(level = "debug", skip_all)]
553 async fn handle_stop_provider(
554 &self,
555 request: StopProviderCommand,
556 ) -> anyhow::Result<CtlResponse<()>> {
557 let provider_id = request.provider_id();
558 let host_id = request.host_id();
559
560 debug!(provider_id, "handling stop provider");
561
562 let mut providers = self.providers.write().await;
563 let hash_map::Entry::Occupied(entry) = providers.entry(provider_id.into()) else {
564 warn!(
565 provider_id,
566 "received request to stop provider that is not running"
567 );
568 return Ok(CtlResponse::error("provider with that ID is not running"));
569 };
570 let Provider {
571 ref annotations,
572 mut tasks,
573 shutdown,
574 ..
575 } = entry.remove();
576
577 shutdown.store(true, Ordering::Relaxed);
580
581 let req = serde_json::to_vec(&json!({ "host_id": host_id }))
583 .context("failed to encode provider stop request")?;
584 let req = async_nats::Request::new()
585 .payload(req.into())
586 .timeout(self.host_config.provider_shutdown_delay)
587 .headers(injector_to_headers(
588 &TraceContextInjector::default_with_span(),
589 ));
590 if let Err(e) = self
591 .rpc_nats
592 .send_request(
593 shutdown_subject(&self.host_config.lattice, provider_id, "default"),
594 req,
595 )
596 .await
597 {
598 warn!(
599 ?e,
600 provider_id,
601 "provider did not gracefully shut down in time, shutting down forcefully"
602 );
603 }
606
607 tasks.abort_all();
609
610 info!(provider_id, "provider stopped");
611 self.event_publisher
612 .publish_event(
613 "provider_stopped",
614 crate::event::provider_stopped(annotations, host_id, provider_id, "stop"),
615 )
616 .await?;
617 Ok(CtlResponse::<()>::success(
618 "successfully stopped provider".into(),
619 ))
620 }
621
622 #[instrument(level = "debug", skip_all)]
623 async fn handle_inventory(&self) -> anyhow::Result<CtlResponse<HostInventory>> {
624 trace!("handling inventory");
625 let inventory = self.inventory().await;
626 Ok(CtlResponse::ok(inventory))
627 }
628
629 #[instrument(level = "trace", skip_all)]
630 async fn handle_claims(&self) -> anyhow::Result<CtlResponse<Vec<HashMap<String, String>>>> {
631 trace!("handling claims");
632
633 let (component_claims, provider_claims) =
634 join!(self.component_claims.read(), self.provider_claims.read());
635 let component_claims = component_claims.values().cloned().map(Claims::Component);
636 let provider_claims = provider_claims.values().cloned().map(Claims::Provider);
637 let claims: Vec<StoredClaims> = component_claims
638 .chain(provider_claims)
639 .flat_map(TryFrom::try_from)
640 .collect();
641
642 Ok(CtlResponse::ok(
643 claims.into_iter().map(std::convert::Into::into).collect(),
644 ))
645 }
646
647 #[instrument(level = "trace", skip_all)]
648 async fn handle_links(&self) -> anyhow::Result<Vec<u8>> {
650 trace!("handling links");
651
652 let links = self.links.read().await;
653 let links: Vec<&Link> = links.values().flatten().collect();
654 let res =
655 serde_json::to_vec(&CtlResponse::ok(links)).context("failed to serialize response")?;
656 Ok(res)
657 }
658
659 #[instrument(level = "trace", skip(self))]
660 async fn handle_config_get(&self, config_name: &str) -> anyhow::Result<Vec<u8>> {
661 trace!(%config_name, "handling get config");
662 if let Some(config_bytes) = self.config_store.get(config_name).await? {
663 let config_map: HashMap<String, String> = serde_json::from_slice(&config_bytes)
664 .context("config data should be a map of string -> string")?;
665 serde_json::to_vec(&CtlResponse::ok(config_map)).map_err(anyhow::Error::from)
666 } else {
667 serde_json::to_vec(&CtlResponse::<()>::success(
668 "Configuration not found".into(),
669 ))
670 .map_err(anyhow::Error::from)
671 }
672 }
673
674 #[instrument(level = "debug", skip_all, fields(%config_name))]
675 async fn handle_config_delete(&self, config_name: &str) -> anyhow::Result<CtlResponse<()>> {
676 debug!("handle config entry deletion");
677
678 self.config_store
679 .del(config_name)
680 .await
681 .context("Unable to delete config data")?;
682
683 self.event_publisher
684 .publish_event("config_deleted", crate::event::config_deleted(config_name))
685 .await?;
686
687 Ok(CtlResponse::<()>::success(
688 "successfully deleted config".into(),
689 ))
690 }
691
692 #[instrument(level = "debug", skip_all)]
693 async fn handle_label_put(
694 &self,
695 request: HostLabel,
696 host_id: &str,
697 ) -> anyhow::Result<CtlResponse<()>> {
698 let key = request.key();
699 if key.to_lowercase().starts_with("hostcore.") {
700 bail!("hostcore.* labels cannot be set dynamically");
701 }
702
703 let value = request.value();
704 let mut labels = self.labels.write().await;
705 match labels.entry(key.into()) {
706 BTreeMapEntry::Occupied(mut entry) => {
707 info!(key = entry.key(), value, "updated label");
708 entry.insert(value.into());
709 }
710 BTreeMapEntry::Vacant(entry) => {
711 info!(key = entry.key(), value, "set label");
712 entry.insert(value.into());
713 }
714 }
715
716 self.event_publisher
717 .publish_event(
718 "labels_changed",
719 crate::event::labels_changed(host_id, HashMap::from_iter(labels.clone())),
720 )
721 .await
722 .context("failed to publish labels_changed event")?;
723
724 Ok(CtlResponse::<()>::success("successfully put label".into()))
725 }
726
727 #[instrument(level = "debug", skip_all)]
728 async fn handle_label_del(
729 &self,
730 request: HostLabelIdentifier,
731 host_id: &str,
732 ) -> anyhow::Result<CtlResponse<()>> {
733 let key = request.key();
734 let mut labels = self.labels.write().await;
735 let value = labels.remove(key);
736
737 if value.is_none() {
738 warn!(key, "could not remove unset label");
739 return Ok(CtlResponse::<()>::success(
740 "successfully deleted label (no such label)".into(),
741 ));
742 };
743
744 info!(key, "removed label");
745 self.event_publisher
746 .publish_event(
747 "labels_changed",
748 crate::event::labels_changed(host_id, HashMap::from_iter(labels.clone())),
749 )
750 .await
751 .context("failed to publish labels_changed event")?;
752
753 Ok(CtlResponse::<()>::success(
754 "successfully deleted label".into(),
755 ))
756 }
757
758 #[instrument(level = "debug", skip_all)]
760 async fn handle_link_put(&self, request: Link) -> anyhow::Result<CtlResponse<()>> {
761 let link_set_result: anyhow::Result<()> = async {
762 let source_id = request.source_id();
763 let target = request.target();
764 let wit_namespace = request.wit_namespace();
765 let wit_package = request.wit_package();
766 let interfaces = request.interfaces();
767 let name = request.name();
768
769 let ns_and_package = format!("{wit_namespace}:{wit_package}");
770 debug!(
771 source_id,
772 target,
773 ns_and_package,
774 name,
775 ?interfaces,
776 "handling put wrpc link definition"
777 );
778
779 self.validate_config(
781 request
782 .source_config()
783 .clone()
784 .iter()
785 .chain(request.target_config())
786 ).await?;
787
788 let mut component_spec = self
789 .get_component_spec(source_id)
790 .await?
791 .unwrap_or_default();
792
793 if let Some(existing_conflict_link) = component_spec.links.iter().find(|link| {
796 link.source_id() == source_id
797 && link.wit_namespace() == wit_namespace
798 && link.wit_package() == wit_package
799 && link.name() == name
800 && link.interfaces().iter().any(|i| interfaces.contains(i))
802 && link.target() != target
803 }) {
804 error!(
805 source_id,
806 desired_target = target,
807 existing_target = existing_conflict_link.target(),
808 ns_and_package,
809 name,
810 "link already exists with different target, consider deleting the existing link or using a different link name"
811 );
812 bail!("link already exists with different target, consider deleting the existing link or using a different link name");
813 }
814
815 if let Some(existing_link_index) = component_spec.links.iter().position(|link| {
818 link.source_id() == source_id
819 && link.target() == target
820 && link.wit_namespace() == wit_namespace
821 && link.wit_package() == wit_package
822 && link.name() == name
823 }) {
824 if let Some(existing_link) = component_spec.links.get_mut(existing_link_index) {
825 *existing_link = request.clone();
826 }
827 } else {
828 component_spec.links.push(request.clone());
829 };
830
831 self.store_component_spec(&source_id, &component_spec)
833 .await?;
834 self.put_backwards_compat_provider_link(&request)
843 .await?;
844
845 Ok(())
846 }
847 .await;
848
849 if let Err(e) = link_set_result {
850 self.event_publisher
851 .publish_event(
852 "linkdef_set_failed",
853 crate::event::linkdef_set_failed(&request, &e),
854 )
855 .await?;
856 Ok(CtlResponse::error(e.to_string().as_ref()))
857 } else {
858 self.event_publisher
859 .publish_event("linkdef_set", crate::event::linkdef_set(&request))
860 .await?;
861 Ok(CtlResponse::<()>::success("successfully set link".into()))
862 }
863 }
864
865 #[instrument(level = "debug", skip_all)]
866 async fn handle_link_del(
868 &self,
869 request: DeleteInterfaceLinkDefinitionRequest,
870 ) -> anyhow::Result<CtlResponse<()>> {
871 let source_id = request.source_id();
872 let wit_namespace = request.wit_namespace();
873 let wit_package = request.wit_package();
874 let link_name = request.link_name();
875
876 let ns_and_package = format!("{wit_namespace}:{wit_package}");
877
878 debug!(
879 source_id,
880 ns_and_package, link_name, "handling del wrpc link definition"
881 );
882
883 let Some(mut component_spec) = self.get_component_spec(source_id).await? else {
884 return Ok(CtlResponse::<()>::success(
886 "successfully deleted link (spec doesn't exist)".into(),
887 ));
888 };
889
890 let deleted_link = if let Some(existing_link_index) =
893 component_spec.links.iter().position(|link| {
894 link.source_id() == source_id
895 && link.wit_namespace() == wit_namespace
896 && link.wit_package() == wit_package
897 && link.name() == link_name
898 }) {
899 if existing_link_index < component_spec.links.len() {
901 Some(component_spec.links.swap_remove(existing_link_index))
902 } else {
903 None
904 }
905 } else {
906 None
907 };
908
909 if let Some(link) = deleted_link.as_ref() {
910 self.store_component_spec(&source_id, &component_spec)
912 .await?;
913 self.update_host_with_spec(&source_id, &component_spec)
914 .await?;
915
916 self.del_provider_link(link).await?;
918 }
919
920 let deleted_link_target = deleted_link
922 .as_ref()
923 .map(|link| String::from(link.target()));
924 self.event_publisher
925 .publish_event(
926 "linkdef_deleted",
927 crate::event::linkdef_deleted(
928 source_id,
929 deleted_link_target.as_ref(),
930 link_name,
931 wit_namespace,
932 wit_package,
933 deleted_link.as_ref().map(|link| link.interfaces()),
934 ),
935 )
936 .await?;
937
938 Ok(CtlResponse::<()>::success(
939 "successfully deleted link".into(),
940 ))
941 }
942
943 #[instrument(level = "debug", skip_all)]
944 async fn handle_registries_put(
945 &self,
946 request: HashMap<String, RegistryCredential>,
947 ) -> anyhow::Result<CtlResponse<()>> {
948 info!(
949 registries = ?request.keys(),
950 "updating registry config",
951 );
952
953 let mut registry_config = self.registry_config.write().await;
954 for (reg, new_creds) in request {
955 let mut new_config = new_creds.into_registry_config()?;
956 match registry_config.entry(reg) {
957 hash_map::Entry::Occupied(mut entry) => {
958 entry.get_mut().set_auth(new_config.auth().clone());
959 }
960 hash_map::Entry::Vacant(entry) => {
961 new_config.set_allow_latest(self.host_config.oci_opts.allow_latest);
962 entry.insert(new_config);
963 }
964 }
965 }
966
967 Ok(CtlResponse::<()>::success(
968 "successfully put registries".into(),
969 ))
970 }
971
972 #[instrument(level = "debug", skip_all, fields(%config_name))]
973 async fn handle_config_put(
974 &self,
975 config_name: &str,
976 data: Bytes,
977 ) -> anyhow::Result<CtlResponse<()>> {
978 debug!("handle config entry put");
979 serde_json::from_slice::<HashMap<String, String>>(&data)
981 .context("config data should be a map of string -> string")?;
982 self.config_store
983 .put(config_name, data)
984 .await
985 .context("unable to store config data")?;
986 self.event_publisher
989 .publish_event("config_set", crate::event::config_set(config_name))
990 .await?;
991
992 Ok(CtlResponse::<()>::success("successfully put config".into()))
993 }
994
995 #[instrument(level = "debug", skip_all)]
996 async fn handle_ping_hosts(
997 &self,
998 ) -> anyhow::Result<CtlResponse<wasmcloud_control_interface::Host>> {
999 trace!("replying to ping");
1000 let uptime = self.start_at.elapsed();
1001
1002 let mut host = wasmcloud_control_interface::Host::builder()
1003 .id(self.host_key.public_key())
1004 .labels(self.labels.read().await.clone())
1005 .friendly_name(self.friendly_name.clone())
1006 .uptime_seconds(uptime.as_secs())
1007 .uptime_human(human_friendly_uptime(uptime))
1008 .version(self.host_config.version.clone())
1009 .ctl_host(self.host_config.rpc_nats_url.to_string())
1010 .rpc_host(self.host_config.rpc_nats_url.to_string())
1011 .lattice(self.host_config.lattice.to_string());
1012
1013 if let Some(ref js_domain) = self.host_config.js_domain {
1014 host = host.js_domain(js_domain.clone());
1015 }
1016
1017 let host = host
1018 .build()
1019 .map_err(|e| anyhow!("failed to build host message: {e}"))?;
1020
1021 Ok(CtlResponse::ok(host))
1022 }
1023}