wasmcloud_host/
event.rs

1use std::collections::{BTreeMap, HashMap};
2
3use serde_json::json;
4use wascap::jwt;
5use wasmcloud_control_interface::Link;
6
7/// A trait for publishing wasmbus events. This can be implemented by any transport or bus
8/// implementation that can send the serialized event to the appropriate destination.
9///
10/// TODO(#4408): This trait can certainly be enhanced by having a more strongly typed event,
11/// e.g. WasmbusEvent which implements Serialize.
12#[async_trait::async_trait]
13pub trait EventPublisher: Send + Sync {
14    /// Publish an event that occurred in the host. The event name is the type of event being published, and the
15    /// data is the payload of the event. It's up to the implementation to
16    /// determine how to handle events. By default, this is a no-op.
17    async fn publish_event(
18        &self,
19        _event_name: &str,
20        _data: serde_json::Value,
21    ) -> anyhow::Result<()> {
22        Ok(())
23    }
24}
25
26/// A default implementation of the EventPublisher trait that does nothing.
27/// This is useful for testing or when no event publishing is required.
28#[derive(Default)]
29pub struct DefaultEventPublisher {}
30impl EventPublisher for DefaultEventPublisher {}
31
32fn format_component_claims(claims: &jwt::Claims<jwt::Component>) -> serde_json::Value {
33    let issuer = &claims.issuer;
34    let not_before_human = claims
35        .not_before
36        .map(|n| n.to_string())
37        .unwrap_or_else(|| "never".to_string());
38    let expires_human = claims
39        .expires
40        .map(|n| n.to_string())
41        .unwrap_or_else(|| "never".to_string());
42    if let Some(component) = &claims.metadata {
43        json!({
44            "call_alias": component.call_alias,
45            "issuer": issuer,
46            "tags": component.tags,
47            "name": component.name,
48            "version": component.ver,
49            "revision": component.rev,
50            "not_before_human": not_before_human,
51            "expires_human": expires_human,
52        })
53    } else {
54        json!({
55            "issuer": issuer,
56            "not_before_human": not_before_human,
57            "expires_human": expires_human,
58        })
59    }
60}
61
62/// Generates an event payload for when a component is scaled
63///
64/// # Arguments
65/// * `claims` - Optional component claims
66/// * `annotations` - Key-value pairs of metadata annotations
67/// * `host_id` - ID of the host where scaling occurred
68/// * `max_instances` - Maximum number of instances to scale to
69/// * `image_ref` - Reference to the component image
70/// * `component_id` - Unique identifier for the component
71///
72/// # Returns
73/// JSON object containing scaling details and component metadata
74pub fn component_scaled(
75    claims: Option<&jwt::Claims<jwt::Component>>,
76    annotations: &BTreeMap<String, String>,
77    host_id: impl AsRef<str>,
78    max_instances: impl Into<usize>,
79    image_ref: impl AsRef<str>,
80    component_id: impl AsRef<str>,
81) -> serde_json::Value {
82    if let Some(claims) = claims {
83        json!({
84            "public_key": claims.subject,
85            "claims": format_component_claims(claims),
86            "annotations": annotations,
87            "host_id": host_id.as_ref(),
88            "image_ref": image_ref.as_ref(),
89            "max_instances": max_instances.into(),
90            "component_id": component_id.as_ref(),
91        })
92    } else {
93        json!({
94            "annotations": annotations,
95            "host_id": host_id.as_ref(),
96            "image_ref": image_ref.as_ref(),
97            "max_instances": max_instances.into(),
98            "component_id": component_id.as_ref(),
99        })
100    }
101}
102
103/// Generates an event payload for when component scaling fails
104///
105/// # Arguments
106/// * `claims` - Optional component claims
107/// * `annotations` - Key-value pairs of metadata annotations
108/// * `host_id` - ID of the host where scaling failed
109/// * `image_ref` - Reference to the component image
110/// * `component_id` - Unique identifier for the component
111/// * `max_instances` - Target number of instances that failed to scale
112/// * `error` - The error that caused the scaling failure
113///
114/// # Returns
115/// JSON object containing scaling failure details and error information
116pub fn component_scale_failed(
117    claims: Option<&jwt::Claims<jwt::Component>>,
118    annotations: &BTreeMap<String, String>,
119    host_id: impl AsRef<str>,
120    image_ref: impl AsRef<str>,
121    component_id: impl AsRef<str>,
122    max_instances: u32,
123    error: &anyhow::Error,
124) -> serde_json::Value {
125    if let Some(claims) = claims {
126        json!({
127            "public_key": claims.subject,
128            "component_id": component_id.as_ref(),
129            "annotations": annotations,
130            "host_id": host_id.as_ref(),
131            "image_ref": image_ref.as_ref(),
132            "max_instances": max_instances,
133            "error": format!("{error:#}"),
134        })
135    } else {
136        json!({
137            "annotations": annotations,
138            "component_id": component_id.as_ref(),
139            "host_id": host_id.as_ref(),
140            "image_ref": image_ref.as_ref(),
141            "max_instances": max_instances,
142            "error": format!("{error:#}"),
143        })
144    }
145}
146
147/// Generates an event payload for when a link definition is set
148///
149/// # Arguments
150/// * `link` - Link definition containing source, target, and interface information
151///
152/// # Returns
153/// JSON object containing complete link definition details
154pub fn linkdef_set(link: &Link) -> serde_json::Value {
155    json!({
156        "source_id": link.source_id(),
157        "target": link.target(),
158        "name": link.name(),
159        "wit_namespace": link.wit_namespace(),
160        "wit_package": link.wit_package(),
161        "interfaces": link.interfaces(),
162        "source_config": link.source_config(),
163        "target_config": link.target_config(),
164    })
165}
166
167/// Generates an event payload for when setting a link definition fails
168///
169/// # Arguments
170/// * `link` - Link definition that failed to be set
171/// * `error` - The error that caused the link definition failure
172///
173/// # Returns
174/// JSON object containing link definition details and error information
175pub fn linkdef_set_failed(link: &Link, error: &anyhow::Error) -> serde_json::Value {
176    json!({
177        "source_id": link.source_id(),
178        "target": link.target(),
179        "name": link.name(),
180        "wit_namespace": link.wit_namespace(),
181        "wit_package": link.wit_package(),
182        "interfaces": link.interfaces(),
183        "source_config": link.source_config(),
184        "target_config": link.target_config(),
185        "error": format!("{error:#}"),
186    })
187}
188
189/// Generates an event payload for when a link definition is deleted
190///
191/// # Arguments
192/// * `source_id` - ID of the source component
193/// * `target` - Optional target component ID
194/// * `name` - Name of the link
195/// * `wit_namespace` - WIT namespace for the link
196/// * `wit_package` - WIT package name
197/// * `interfaces` - Optional list of interface names
198///
199/// # Returns
200/// JSON object containing link deletion details
201pub fn linkdef_deleted(
202    source_id: impl AsRef<str>,
203    target: Option<&String>,
204    name: impl AsRef<str>,
205    wit_namespace: impl AsRef<str>,
206    wit_package: impl AsRef<str>,
207    interfaces: Option<&Vec<String>>,
208) -> serde_json::Value {
209    // Target and interfaces aren't known if the link didn't exist, so we omit them from the
210    // event data in that case.
211    if let (Some(target), Some(interfaces)) = (target, interfaces) {
212        json!({
213            "source_id": source_id.as_ref(),
214            "target": target,
215            "name": name.as_ref(),
216            "wit_namespace": wit_namespace.as_ref(),
217            "wit_package": wit_package.as_ref(),
218            "interfaces": interfaces,
219        })
220    } else {
221        json!({
222            "source_id": source_id.as_ref(),
223            "name": name.as_ref(),
224            "wit_namespace": wit_namespace.as_ref(),
225            "wit_package": wit_package.as_ref(),
226        })
227    }
228}
229
230/// Generates an event payload for when a provider starts
231///
232/// # Arguments
233/// * `claims` - Optional capability provider claims
234/// * `annotations` - Key-value pairs of metadata annotations
235/// * `host_id` - ID of the host where provider started
236/// * `image_ref` - Reference to the provider image
237/// * `provider_id` - Unique identifier for the provider
238///
239/// # Returns
240/// JSON object containing provider startup details and metadata
241pub fn provider_started(
242    claims: Option<&jwt::Claims<jwt::CapabilityProvider>>,
243    annotations: &BTreeMap<String, String>,
244    host_id: impl AsRef<str>,
245    image_ref: impl AsRef<str>,
246    provider_id: impl AsRef<str>,
247) -> serde_json::Value {
248    if let Some(claims) = claims {
249        let not_before_human = claims
250            .not_before
251            .map(|n| n.to_string())
252            .unwrap_or_else(|| "never".to_string());
253        let expires_human = claims
254            .expires
255            .map(|n| n.to_string())
256            .unwrap_or_else(|| "never".to_string());
257        let metadata = claims.metadata.as_ref();
258        json!({
259            "host_id": host_id.as_ref(),
260            "image_ref": image_ref.as_ref(),
261            "provider_id": provider_id.as_ref(),
262            "annotations": annotations,
263            "claims": {
264                "issuer": &claims.issuer,
265                "tags": None::<Vec<()>>, // present in OTP, but hardcoded to `None`
266                "name": metadata.map(|jwt::CapabilityProvider { name, .. }| name),
267                "version": metadata.map(|jwt::CapabilityProvider { ver, .. }| ver),
268                "not_before_human": not_before_human,
269                "expires_human": expires_human,
270            },
271            // TODO(#1548): remove these fields when we don't depend on them
272            "instance_id": provider_id.as_ref(),
273            "public_key": provider_id.as_ref(),
274            "link_name": "default",
275        })
276    } else {
277        json!({
278            "host_id": host_id.as_ref(),
279            "image_ref": image_ref.as_ref(),
280            "provider_id": provider_id.as_ref(),
281            "annotations": annotations,
282        })
283    }
284}
285
286/// Generates an event payload for when a provider fails to start
287///
288/// # Arguments
289/// * `provider_ref` - Reference to the provider image
290/// * `provider_id` - Unique identifier for the provider
291/// * `host_id` - ID of the host where start failed
292/// * `error` - The error that caused the start failure
293///
294/// # Returns
295/// JSON object containing provider start failure details
296pub fn provider_start_failed(
297    provider_ref: impl AsRef<str>,
298    provider_id: impl AsRef<str>,
299    host_id: impl AsRef<str>,
300    error: &anyhow::Error,
301) -> serde_json::Value {
302    json!({
303        "provider_ref": provider_ref.as_ref(),
304        "provider_id": provider_id.as_ref(),
305        "host_id": host_id.as_ref(),
306        "error": format!("{error:#}"),
307        // TODO(#1548): remove this field when we don't depend on it
308        "link_name": "default",
309    })
310}
311
312/// Generates an event payload for when a provider stops
313///
314/// # Arguments
315/// * `annotations` - Key-value pairs of metadata annotations
316/// * `host_id` - ID of the host where provider stopped
317/// * `provider_id` - Unique identifier for the provider
318/// * `reason` - Reason for stopping the provider
319///
320/// # Returns
321/// JSON object containing provider stop details
322pub fn provider_stopped(
323    annotations: &BTreeMap<String, String>,
324    host_id: impl AsRef<str>,
325    provider_id: impl AsRef<str>,
326    reason: impl AsRef<str>,
327) -> serde_json::Value {
328    json!({
329        "host_id": host_id.as_ref(),
330        "provider_id": provider_id.as_ref(),
331        "annotations": annotations,
332        "reason": reason.as_ref(),
333        // TODO(#1548): remove these fields when we don't depend on them
334        "instance_id": provider_id.as_ref(),
335        "public_key": provider_id.as_ref(),
336        "link_name": "default",
337    })
338}
339
340/// Generates an event payload for provider health checks
341///
342/// # Arguments
343/// * `host_id` - ID of the host performing the health check
344/// * `provider_id` - Unique identifier for the provider being checked
345///
346/// # Returns
347/// JSON object containing health check details
348pub fn provider_health_check(
349    host_id: impl AsRef<str>,
350    provider_id: impl AsRef<str>,
351) -> serde_json::Value {
352    json!({
353        "host_id": host_id.as_ref(),
354        "provider_id": provider_id.as_ref(),
355    })
356}
357
358/// Generates an event payload for when a config is set
359///
360/// # Arguments
361/// * `config_name` - Name of the configuration being set
362///
363/// # Returns
364/// JSON object containing config set details
365pub fn config_set(config_name: impl AsRef<str>) -> serde_json::Value {
366    json!({
367        "config_name": config_name.as_ref(),
368    })
369}
370
371/// Generates an event payload for when a config is deleted
372///
373/// # Arguments
374/// * `config_name` - Name of the configuration being deleted
375///
376/// # Returns
377/// JSON object containing config deletion details
378pub fn config_deleted(config_name: impl AsRef<str>) -> serde_json::Value {
379    json!({
380        "config_name": config_name.as_ref(),
381    })
382}
383
384/// Generates an event payload for when host labels are changed
385///
386/// # Arguments
387/// * `host_id` - ID of the host whose labels changed
388/// * `labels` - New set of labels as key-value pairs
389///
390/// # Returns
391/// JSON object containing updated label information
392pub fn labels_changed(
393    host_id: impl AsRef<str>,
394    labels: impl Into<HashMap<String, String>>,
395) -> serde_json::Value {
396    json!({
397        "host_id": host_id.as_ref(),
398        "labels": labels.into(),
399    })
400}