wasmcloud_host/
metrics.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
use std::sync::Arc;
use std::time::Duration;

use sysinfo::System;
use tokio::task::JoinHandle;
use wasmcloud_tracing::{Counter, Histogram, KeyValue, Meter, ObservableGauge};

const DEFAULT_REFRESH_TIME: Duration = Duration::from_secs(5);

/// `HostMetrics` encapsulates the set of metrics emitted by the wasmcloud host
#[derive(Clone, Debug)]
#[allow(clippy::module_name_repetitions)]
pub struct HostMetrics {
    /// Represents the time it took for each handle_rpc_message invocation in nanoseconds.
    pub handle_rpc_message_duration_ns: Histogram<u64>,
    /// The count of the number of times an component was invoked.
    pub component_invocations: Counter<u64>,
    /// The count of the number of times an component invocation resulted in an error.
    pub component_errors: Counter<u64>,

    /// The total amount of available system memory in bytes.
    pub system_total_memory_bytes: ObservableGauge<u64>,
    /// The total amount of used system memory in bytes.
    pub system_used_memory_bytes: ObservableGauge<u64>,
    /// The total cpu usage.
    pub system_cpu_usage: ObservableGauge<f64>,

    /// The host's ID.
    // TODO this is actually configured as an InstrumentationScope attribute on the global meter,
    // but we don't really have a way of getting at those. We should figure out a way to get at that
    // information so we don't have to duplicate it here.
    pub host_id: String,
    /// The host's lattice ID.
    // Eventually a host will be able to support multiple lattices, so this will need to either be
    // removed or metrics will need to be scoped per-lattice.
    pub lattice_id: String,

    // Task handle for dropping when the metrics are no longer needed.
    _refresh_task_handle: Arc<RefreshWrapper>,
}

struct SystemMetrics {
    system_total_memory_bytes: u64,
    /// The total amount of used system memory in bytes.
    system_used_memory_bytes: u64,
    /// The total cpu usage.
    system_cpu_usage: f64,
}

/// A helper struct for encapsulating the system metrics that should be wrapped in an Arc.
///
/// When the final reference is removed, the drop will abort the watch task. This allows the metrics
/// to be clonable
#[derive(Debug)]
struct RefreshWrapper(JoinHandle<()>);

impl Drop for RefreshWrapper {
    fn drop(&mut self) {
        self.0.abort();
    }
}

impl HostMetrics {
    /// Construct a new [`HostMetrics`] instance for accessing the various wasmcloud host metrics
    /// linked to the provided meter.
    ///
    /// The `refresh_time` is optional and defaults to 5 seconds. This time is used to configure how
    /// often system level metrics are refreshed
    pub fn new(
        meter: &Meter,
        host_id: String,
        lattice_id: String,
        refresh_time: Option<Duration>,
    ) -> anyhow::Result<Self> {
        let wasmcloud_host_handle_rpc_message_duration_ns = meter
            .u64_histogram("wasmcloud_host.handle_rpc_message.duration")
            .with_description("Duration in nanoseconds each handle_rpc_message operation took")
            .with_unit("nanoseconds")
            .build();

        let component_invocation_count = meter
            .u64_counter("wasmcloud_host.component.invocations")
            .with_description("Number of component invocations")
            .build();

        let component_error_count = meter
            .u64_counter("wasmcloud_host.component.invocation.errors")
            .with_description("Number of component errors")
            .build();

        let mut system = System::new();
        // Get the initial metrics
        system.refresh_memory();
        system.refresh_cpu_usage();
        let initial_metrics = SystemMetrics {
            system_total_memory_bytes: system.total_memory(),
            system_used_memory_bytes: system.used_memory(),
            system_cpu_usage: system.global_cpu_usage() as f64,
        };
        let (tx, rx) = tokio::sync::watch::channel(initial_metrics);

        let refresh_time = refresh_time.unwrap_or(DEFAULT_REFRESH_TIME);

        let refresh_task_handle = tokio::spawn(async move {
            loop {
                system.refresh_memory();
                system.refresh_cpu_usage();

                tx.send_modify(|current| {
                    current.system_total_memory_bytes = system.total_memory();
                    current.system_used_memory_bytes = system.used_memory();
                    current.system_cpu_usage = system.global_cpu_usage() as f64;
                });
                tokio::time::sleep(refresh_time).await;
            }
        });
        // System Memory
        let system_memory_total_bytes = meter
            .u64_observable_gauge("wasmcloud_host.process.memory.total.bytes")
            .with_description("The total amount of memory in bytes")
            .with_unit("bytes")
            .with_callback({
                let rx = rx.clone();
                move |observer| {
                    let metrics = rx.borrow();
                    observer.observe(metrics.system_total_memory_bytes, &[]);
                }
            })
            .build();

        let system_memory_used_bytes = meter
            .u64_observable_gauge("wasmcloud_host.process.memory.used.bytes")
            .with_description("The used amount of memory in bytes")
            .with_unit("bytes")
            .with_callback({
                let rx_clone = rx.clone();
                move |observer| {
                    let metrics = rx_clone.borrow();
                    observer.observe(metrics.system_used_memory_bytes, &[]);
                }
            })
            .build();

        // System CPU
        let system_cpu_usage = meter
            .f64_observable_gauge("wasmcloud_host.process.cpu.usage")
            .with_description("The CPU usage of the process")
            .with_unit("percentage")
            .with_callback({
                let rx = rx.clone();
                move |observer| {
                    let metrics = rx.borrow();
                    observer.observe(metrics.system_cpu_usage, &[]);
                }
            })
            .build();

        Ok(Self {
            handle_rpc_message_duration_ns: wasmcloud_host_handle_rpc_message_duration_ns,
            component_invocations: component_invocation_count,
            component_errors: component_error_count,
            system_total_memory_bytes: system_memory_total_bytes,
            system_used_memory_bytes: system_memory_used_bytes,
            system_cpu_usage,
            host_id,
            lattice_id,
            _refresh_task_handle: Arc::new(RefreshWrapper(refresh_task_handle)),
        })
    }

    /// Record the result of invoking a component, including the elapsed time, any attributes, and whether the invocation resulted in an error.
    pub(crate) fn record_component_invocation(
        &self,
        elapsed: u64,
        attributes: &[KeyValue],
        error: bool,
    ) {
        self.handle_rpc_message_duration_ns
            .record(elapsed, attributes);
        self.component_invocations.add(1, attributes);
        if error {
            self.component_errors.add(1, attributes);
        }
    }
}