wasmcloud_host/
metrics.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use sysinfo::System;
5use tokio::task::JoinHandle;
6use wasmcloud_tracing::{
7 Counter, Gauge, Histogram, KeyValue, Meter, ObservableGauge, UpDownCounter,
8};
9
10const DEFAULT_REFRESH_TIME: Duration = Duration::from_secs(5);
11
12#[derive(Clone, Debug)]
14#[allow(clippy::module_name_repetitions)]
15pub struct HostMetrics {
16 pub handle_rpc_message_duration_ns: Histogram<u64>,
18 pub component_invocations: Counter<u64>,
20 pub component_errors: Counter<u64>,
22 pub component_active_instances: UpDownCounter<i64>,
24 pub component_max_instances: Gauge<u64>,
26
27 pub system_total_memory_bytes: ObservableGauge<u64>,
29 pub system_used_memory_bytes: ObservableGauge<u64>,
31 pub system_cpu_usage: ObservableGauge<f64>,
33
34 pub host_id: String,
39 pub lattice_id: String,
43
44 _refresh_task_handle: Arc<RefreshWrapper>,
46}
47
48struct SystemMetrics {
49 system_total_memory_bytes: u64,
50 system_used_memory_bytes: u64,
52 system_cpu_usage: f64,
54}
55
56#[derive(Debug)]
61struct RefreshWrapper(JoinHandle<()>);
62
63impl Drop for RefreshWrapper {
64 fn drop(&mut self) {
65 self.0.abort();
66 }
67}
68
69impl HostMetrics {
70 pub fn new(
76 meter: &Meter,
77 host_id: String,
78 lattice_id: String,
79 refresh_time: Option<Duration>,
80 ) -> anyhow::Result<Self> {
81 let wasmcloud_host_handle_rpc_message_duration_ns = meter
82 .u64_histogram("wasmcloud_host.handle_rpc_message.duration")
83 .with_description("Duration in nanoseconds each handle_rpc_message operation took")
84 .with_unit("nanoseconds")
85 .build();
86
87 let component_invocation_count = meter
88 .u64_counter("wasmcloud_host.component.invocations")
89 .with_description("Number of component invocations")
90 .build();
91
92 let component_error_count = meter
93 .u64_counter("wasmcloud_host.component.invocation.errors")
94 .with_description("Number of component errors")
95 .build();
96
97 let component_active_instances = meter
98 .i64_up_down_counter("wasmcloud_host.component.active_instances")
99 .with_description("Number of active component instances")
100 .build();
101
102 let component_max_instances = meter
103 .u64_gauge("wasmcloud_host.component.max_instances")
104 .with_description("Maximum number of component instances")
105 .build();
106
107 let mut system = System::new();
108 system.refresh_memory();
110 system.refresh_cpu_usage();
111 let initial_metrics = SystemMetrics {
112 system_total_memory_bytes: system.total_memory(),
113 system_used_memory_bytes: system.used_memory(),
114 system_cpu_usage: system.global_cpu_usage() as f64,
115 };
116 let (tx, rx) = tokio::sync::watch::channel(initial_metrics);
117
118 let refresh_time = refresh_time.unwrap_or(DEFAULT_REFRESH_TIME);
119
120 let refresh_task_handle = tokio::spawn(async move {
121 loop {
122 system.refresh_memory();
123 system.refresh_cpu_usage();
124
125 tx.send_modify(|current| {
126 current.system_total_memory_bytes = system.total_memory();
127 current.system_used_memory_bytes = system.used_memory();
128 current.system_cpu_usage = system.global_cpu_usage() as f64;
129 });
130 tokio::time::sleep(refresh_time).await;
131 }
132 });
133 let system_memory_total_bytes = meter
135 .u64_observable_gauge("wasmcloud_host.process.memory.total.bytes")
136 .with_description("The total amount of memory in bytes")
137 .with_unit("bytes")
138 .with_callback({
139 let rx = rx.clone();
140 move |observer| {
141 let metrics = rx.borrow();
142 observer.observe(metrics.system_total_memory_bytes, &[]);
143 }
144 })
145 .build();
146
147 let system_memory_used_bytes = meter
148 .u64_observable_gauge("wasmcloud_host.process.memory.used.bytes")
149 .with_description("The used amount of memory in bytes")
150 .with_unit("bytes")
151 .with_callback({
152 let rx_clone = rx.clone();
153 move |observer| {
154 let metrics = rx_clone.borrow();
155 observer.observe(metrics.system_used_memory_bytes, &[]);
156 }
157 })
158 .build();
159
160 let system_cpu_usage = meter
162 .f64_observable_gauge("wasmcloud_host.process.cpu.usage")
163 .with_description("The CPU usage of the process")
164 .with_unit("percentage")
165 .with_callback({
166 let rx = rx.clone();
167 move |observer| {
168 let metrics = rx.borrow();
169 observer.observe(metrics.system_cpu_usage, &[]);
170 }
171 })
172 .build();
173
174 Ok(Self {
175 handle_rpc_message_duration_ns: wasmcloud_host_handle_rpc_message_duration_ns,
176 component_invocations: component_invocation_count,
177 component_errors: component_error_count,
178 component_active_instances,
179 component_max_instances,
180 system_total_memory_bytes: system_memory_total_bytes,
181 system_used_memory_bytes: system_memory_used_bytes,
182 system_cpu_usage,
183 host_id,
184 lattice_id,
185 _refresh_task_handle: Arc::new(RefreshWrapper(refresh_task_handle)),
186 })
187 }
188
189 pub(crate) fn increment_active_instance(&self, attributes: &[KeyValue]) {
191 self.component_active_instances.add(1, attributes);
192 }
193
194 pub(crate) fn decrement_active_instance(&self, attributes: &[KeyValue]) {
196 self.component_active_instances.add(-1, attributes);
197 }
198
199 pub(crate) fn set_max_instances(&self, max: u64, attributes: &[KeyValue]) {
201 self.component_max_instances.record(max, attributes);
202 }
203
204 pub(crate) fn record_component_invocation(
206 &self,
207 elapsed: u64,
208 attributes: &[KeyValue],
209 error: bool,
210 ) {
211 self.handle_rpc_message_duration_ns
212 .record(elapsed, attributes);
213 self.component_invocations.add(1, attributes);
214 if error {
215 self.component_errors.add(1, attributes);
216 }
217 }
218}