wasmcloud_host/
metrics.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use sysinfo::System;
5use tokio::task::JoinHandle;
6use wasmcloud_tracing::{
7    Counter, Gauge, Histogram, KeyValue, Meter, ObservableGauge, UpDownCounter,
8};
9
10const DEFAULT_REFRESH_TIME: Duration = Duration::from_secs(5);
11
12/// `HostMetrics` encapsulates the set of metrics emitted by the wasmcloud host
13#[derive(Clone, Debug)]
14#[allow(clippy::module_name_repetitions)]
15pub struct HostMetrics {
16    /// Represents the time it took for each handle_rpc_message invocation in nanoseconds.
17    pub handle_rpc_message_duration_ns: Histogram<u64>,
18    /// The count of the number of times an component was invoked.
19    pub component_invocations: Counter<u64>,
20    /// The count of the number of times an component invocation resulted in an error.
21    pub component_errors: Counter<u64>,
22    /// The number of active instances of a component.
23    pub component_active_instances: UpDownCounter<i64>,
24    /// The maximum number of instances of a component.
25    pub component_max_instances: Gauge<u64>,
26
27    /// The total amount of available system memory in bytes.
28    pub system_total_memory_bytes: ObservableGauge<u64>,
29    /// The total amount of used system memory in bytes.
30    pub system_used_memory_bytes: ObservableGauge<u64>,
31    /// The total cpu usage.
32    pub system_cpu_usage: ObservableGauge<f64>,
33
34    /// The host's ID.
35    // TODO this is actually configured as an InstrumentationScope attribute on the global meter,
36    // but we don't really have a way of getting at those. We should figure out a way to get at that
37    // information so we don't have to duplicate it here.
38    pub host_id: String,
39    /// The host's lattice ID.
40    // Eventually a host will be able to support multiple lattices, so this will need to either be
41    // removed or metrics will need to be scoped per-lattice.
42    pub lattice_id: String,
43
44    // Task handle for dropping when the metrics are no longer needed.
45    _refresh_task_handle: Arc<RefreshWrapper>,
46}
47
48struct SystemMetrics {
49    system_total_memory_bytes: u64,
50    /// The total amount of used system memory in bytes.
51    system_used_memory_bytes: u64,
52    /// The total cpu usage.
53    system_cpu_usage: f64,
54}
55
56/// A helper struct for encapsulating the system metrics that should be wrapped in an Arc.
57///
58/// When the final reference is removed, the drop will abort the watch task. This allows the metrics
59/// to be clonable
60#[derive(Debug)]
61struct RefreshWrapper(JoinHandle<()>);
62
63impl Drop for RefreshWrapper {
64    fn drop(&mut self) {
65        self.0.abort();
66    }
67}
68
69impl HostMetrics {
70    /// Construct a new [`HostMetrics`] instance for accessing the various wasmcloud host metrics
71    /// linked to the provided meter.
72    ///
73    /// The `refresh_time` is optional and defaults to 5 seconds. This time is used to configure how
74    /// often system level metrics are refreshed
75    pub fn new(
76        meter: &Meter,
77        host_id: String,
78        lattice_id: String,
79        refresh_time: Option<Duration>,
80    ) -> anyhow::Result<Self> {
81        let wasmcloud_host_handle_rpc_message_duration_ns = meter
82            .u64_histogram("wasmcloud_host.handle_rpc_message.duration")
83            .with_description("Duration in nanoseconds each handle_rpc_message operation took")
84            .with_unit("nanoseconds")
85            .build();
86
87        let component_invocation_count = meter
88            .u64_counter("wasmcloud_host.component.invocations")
89            .with_description("Number of component invocations")
90            .build();
91
92        let component_error_count = meter
93            .u64_counter("wasmcloud_host.component.invocation.errors")
94            .with_description("Number of component errors")
95            .build();
96
97        let component_active_instances = meter
98            .i64_up_down_counter("wasmcloud_host.component.active_instances")
99            .with_description("Number of active component instances")
100            .build();
101
102        let component_max_instances = meter
103            .u64_gauge("wasmcloud_host.component.max_instances")
104            .with_description("Maximum number of component instances")
105            .build();
106
107        let mut system = System::new();
108        // Get the initial metrics
109        system.refresh_memory();
110        system.refresh_cpu_usage();
111        let initial_metrics = SystemMetrics {
112            system_total_memory_bytes: system.total_memory(),
113            system_used_memory_bytes: system.used_memory(),
114            system_cpu_usage: system.global_cpu_usage() as f64,
115        };
116        let (tx, rx) = tokio::sync::watch::channel(initial_metrics);
117
118        let refresh_time = refresh_time.unwrap_or(DEFAULT_REFRESH_TIME);
119
120        let refresh_task_handle = tokio::spawn(async move {
121            loop {
122                system.refresh_memory();
123                system.refresh_cpu_usage();
124
125                tx.send_modify(|current| {
126                    current.system_total_memory_bytes = system.total_memory();
127                    current.system_used_memory_bytes = system.used_memory();
128                    current.system_cpu_usage = system.global_cpu_usage() as f64;
129                });
130                tokio::time::sleep(refresh_time).await;
131            }
132        });
133        // System Memory
134        let system_memory_total_bytes = meter
135            .u64_observable_gauge("wasmcloud_host.process.memory.total.bytes")
136            .with_description("The total amount of memory in bytes")
137            .with_unit("bytes")
138            .with_callback({
139                let rx = rx.clone();
140                move |observer| {
141                    let metrics = rx.borrow();
142                    observer.observe(metrics.system_total_memory_bytes, &[]);
143                }
144            })
145            .build();
146
147        let system_memory_used_bytes = meter
148            .u64_observable_gauge("wasmcloud_host.process.memory.used.bytes")
149            .with_description("The used amount of memory in bytes")
150            .with_unit("bytes")
151            .with_callback({
152                let rx_clone = rx.clone();
153                move |observer| {
154                    let metrics = rx_clone.borrow();
155                    observer.observe(metrics.system_used_memory_bytes, &[]);
156                }
157            })
158            .build();
159
160        // System CPU
161        let system_cpu_usage = meter
162            .f64_observable_gauge("wasmcloud_host.process.cpu.usage")
163            .with_description("The CPU usage of the process")
164            .with_unit("percentage")
165            .with_callback({
166                let rx = rx.clone();
167                move |observer| {
168                    let metrics = rx.borrow();
169                    observer.observe(metrics.system_cpu_usage, &[]);
170                }
171            })
172            .build();
173
174        Ok(Self {
175            handle_rpc_message_duration_ns: wasmcloud_host_handle_rpc_message_duration_ns,
176            component_invocations: component_invocation_count,
177            component_errors: component_error_count,
178            component_active_instances,
179            component_max_instances,
180            system_total_memory_bytes: system_memory_total_bytes,
181            system_used_memory_bytes: system_memory_used_bytes,
182            system_cpu_usage,
183            host_id,
184            lattice_id,
185            _refresh_task_handle: Arc::new(RefreshWrapper(refresh_task_handle)),
186        })
187    }
188
189    /// Increment the number of active instances of a component.
190    pub(crate) fn increment_active_instance(&self, attributes: &[KeyValue]) {
191        self.component_active_instances.add(1, attributes);
192    }
193
194    /// Decrement the number of active instances of a component.
195    pub(crate) fn decrement_active_instance(&self, attributes: &[KeyValue]) {
196        self.component_active_instances.add(-1, attributes);
197    }
198
199    /// Set the maximum number of instances of a component.
200    pub(crate) fn set_max_instances(&self, max: u64, attributes: &[KeyValue]) {
201        self.component_max_instances.record(max, attributes);
202    }
203
204    /// Record the result of invoking a component, including the elapsed time, any attributes, and whether the invocation resulted in an error.
205    pub(crate) fn record_component_invocation(
206        &self,
207        elapsed: u64,
208        attributes: &[KeyValue],
209        error: bool,
210    ) {
211        self.handle_rpc_message_duration_ns
212            .record(elapsed, attributes);
213        self.component_invocations.add(1, attributes);
214        if error {
215            self.component_errors.add(1, attributes);
216        }
217    }
218}