wasmcloud_tracing/
traces.rs

1use std::env;
2use std::fs::File;
3use std::io::{BufWriter, IsTerminal};
4use std::path::Path;
5#[cfg(feature = "otel")]
6use std::sync::Arc;
7
8#[cfg(feature = "otel")]
9use anyhow::Context as _;
10#[cfg(feature = "otel")]
11use opentelemetry_otlp::WithExportConfig;
12use tracing::{Event, Subscriber};
13use tracing_flame::FlameLayer;
14use tracing_subscriber::filter::LevelFilter;
15use tracing_subscriber::fmt::format::{DefaultFields, Format, Full, Json, JsonFields, Writer};
16use tracing_subscriber::fmt::time::SystemTime;
17use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields};
18use tracing_subscriber::layer::SubscriberExt;
19use tracing_subscriber::registry::LookupSpan;
20use tracing_subscriber::EnvFilter;
21#[cfg(feature = "otel")]
22use tracing_subscriber::Layer;
23use wasmcloud_core::logging::Level;
24use wasmcloud_core::OtelConfig;
25#[cfg(feature = "otel")]
26use wasmcloud_core::OtelProtocol;
27
28#[cfg(feature = "otel")]
29static LOG_PROVIDER: once_cell::sync::OnceCell<opentelemetry_sdk::logs::SdkLoggerProvider> =
30    once_cell::sync::OnceCell::new();
31
32/// A struct that allows us to dynamically choose JSON formatting without using dynamic dispatch.
33/// This is just so we avoid any sort of possible slow down in logging code
34enum JsonOrNot {
35    Not(Format<Full, SystemTime>),
36    Json(Format<Json, SystemTime>),
37}
38
39impl<S, N> FormatEvent<S, N> for JsonOrNot
40where
41    S: Subscriber + for<'lookup> LookupSpan<'lookup>,
42    N: for<'writer> FormatFields<'writer> + 'static,
43{
44    fn format_event(
45        &self,
46        ctx: &FmtContext<'_, S, N>,
47        writer: Writer<'_>,
48        event: &Event<'_>,
49    ) -> std::fmt::Result
50    where
51        S: Subscriber + for<'a> LookupSpan<'a>,
52    {
53        match self {
54            JsonOrNot::Not(f) => f.format_event(ctx, writer, event),
55            JsonOrNot::Json(f) => f.format_event(ctx, writer, event),
56        }
57    }
58}
59
60/// This guard prevents early `drop()`ing of the tracing related internal data structures
61pub struct FlushGuard {
62    _stderr: tracing_appender::non_blocking::WorkerGuard,
63    _flame: Option<tracing_flame::FlushGuard<BufWriter<File>>>,
64}
65
66/// Configures a global tracing subscriber, which includes:
67/// - A level filter, which forms the base and applies to all other layers
68/// - A local logging layer, which is either plaintext or structured (JSON)
69///
70/// # Errors
71///
72/// This will return an error if the function has already been called, or if we fail to create any
73/// of the layers
74#[cfg(not(feature = "otel"))]
75pub fn configure_tracing(
76    _: &str,
77    _: &OtelConfig,
78    use_structured_logging: bool,
79    flame_graph: Option<impl AsRef<Path>>,
80    log_level_override: Option<&Level>,
81) -> anyhow::Result<(tracing::Dispatch, FlushGuard)> {
82    let flame = flame_graph.map(FlameLayer::with_file).transpose()?;
83    let (flame, flame_guard) = flame.map(|(l, g)| (Some(l), Some(g))).unwrap_or_default();
84    let reg = tracing_subscriber::Registry::default()
85        .with(get_log_level_filter(log_level_override))
86        .with(flame);
87    let stderr = std::io::stderr();
88    let ansi = stderr.is_terminal();
89    let (stderr, stderr_guard) = tracing_appender::non_blocking(stderr);
90    let fmt = tracing_subscriber::fmt::layer()
91        .with_writer(stderr)
92        .with_ansi(ansi);
93
94    let dispatch = if use_structured_logging {
95        reg.with(
96            fmt.event_format(JsonOrNot::Json(Format::default().json()))
97                .fmt_fields(JsonFields::new()),
98        )
99        .into()
100    } else {
101        reg.with(
102            fmt.event_format(JsonOrNot::Not(Format::default()))
103                .fmt_fields(DefaultFields::new()),
104        )
105        .into()
106    };
107
108    Ok((
109        dispatch,
110        FlushGuard {
111            _stderr: stderr_guard,
112            _flame: flame_guard,
113        },
114    ))
115}
116
117/// Configures a global tracing subscriber, which includes:
118/// - A level filter, which forms the base and applies to all other layers
119/// - OTEL tracing and logging layers, if OTEL configuration is provided
120/// - A local logging layer, which is either plaintext or structured (JSON)
121///
122/// # Errors
123///
124/// This will return an error if the function has already been called, or if we fail to create any
125/// of the layers
126#[cfg(feature = "otel")]
127pub fn configure_tracing(
128    service_name: &str,
129    otel_config: &OtelConfig,
130    use_structured_logging: bool,
131    flame_graph: Option<impl AsRef<Path>>,
132    log_level_override: Option<&Level>,
133    trace_level_override: Option<&Level>,
134) -> anyhow::Result<(tracing::Dispatch, FlushGuard)> {
135    let service_name = Arc::from(service_name);
136
137    let log_level_filter = get_log_level_filter(log_level_override);
138    let traces = otel_config
139        .traces_enabled()
140        .then(|| {
141            get_otel_tracing_layer(
142                Arc::clone(&service_name),
143                otel_config,
144                get_trace_level_filter(trace_level_override),
145            )
146        })
147        .transpose()?;
148    let logs = otel_config
149        .logs_enabled()
150        .then(|| get_otel_logging_layer(Arc::clone(&service_name), otel_config, log_level_override))
151        .transpose()?;
152    let flame = flame_graph.map(FlameLayer::with_file).transpose()?;
153    let (flame, flame_guard) = flame
154        .map(|(l, g)| {
155            (
156                Some(l.with_filter(get_trace_level_filter(trace_level_override))),
157                Some(g),
158            )
159        })
160        .unwrap_or_default();
161    let registry = tracing_subscriber::Registry::default()
162        .with(get_log_level_filter(log_level_override))
163        .with(traces)
164        .with(logs)
165        .with(flame);
166    let stderr = std::io::stderr();
167    let ansi = stderr.is_terminal();
168    let (stderr, stderr_guard) = tracing_appender::non_blocking(stderr);
169    let fmt = tracing_subscriber::fmt::layer()
170        .with_writer(stderr)
171        .with_ansi(ansi);
172
173    let dispatch = if use_structured_logging {
174        registry
175            .with(
176                fmt.event_format(JsonOrNot::Json(Format::default().json()))
177                    .fmt_fields(JsonFields::new())
178                    .with_filter(log_level_filter),
179            )
180            .into()
181    } else {
182        registry
183            .with(
184                fmt.event_format(JsonOrNot::Not(Format::default()))
185                    .fmt_fields(DefaultFields::new())
186                    .with_filter(log_level_filter),
187            )
188            .into()
189    };
190
191    Ok((
192        dispatch,
193        FlushGuard {
194            _stderr: stderr_guard,
195            _flame: flame_guard,
196        },
197    ))
198}
199
200#[cfg(feature = "otel")]
201fn get_otel_tracing_layer<S>(
202    service_name: Arc<str>,
203    otel_config: &OtelConfig,
204    trace_level_filter: EnvFilter,
205) -> anyhow::Result<impl tracing_subscriber::Layer<S>>
206where
207    S: Subscriber,
208    S: for<'a> tracing_subscriber::registry::LookupSpan<'a>,
209{
210    use opentelemetry::trace::TracerProvider as _;
211    use opentelemetry_otlp::WithHttpConfig;
212    use opentelemetry_sdk::trace::{BatchConfigBuilder, Sampler};
213    use tracing_opentelemetry::OpenTelemetryLayer;
214
215    let exporter = match otel_config.protocol {
216        OtelProtocol::Http => {
217            let client = crate::get_http_client(otel_config)
218                .context("failed to get an http client for otel tracing exporter")?;
219            opentelemetry_otlp::SpanExporter::builder()
220                .with_http()
221                .with_http_client(client)
222                .with_endpoint(otel_config.traces_endpoint())
223                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
224                .build()
225                .context("failed to build OTEL span exporter")?
226        }
227        OtelProtocol::Grpc => {
228            // TODO(joonas): Configure tonic::transport::ClientTlsConfig via .with_tls_config(...), passing in additional certificates.
229            opentelemetry_otlp::SpanExporter::builder()
230                .with_tonic()
231                .with_endpoint(otel_config.traces_endpoint())
232                .build()
233                .context("failed to build OTEL span exporter")?
234        }
235    };
236
237    // NOTE(thomastaylor312): This is copied and modified from the opentelemetry-sdk crate. We
238    // currently need this because providers map config back into the vars needed to configure the
239    // SDK. When we update providers to be managed externally and remove host-managed ones, we can
240    // remove this. But for now we need to parse all the possible options
241    let sampler = match otel_config.traces_sampler.as_deref() {
242        Some("always_on") => Sampler::AlwaysOn,
243        Some("always_off") => Sampler::AlwaysOff,
244        Some("traceidratio") => {
245            let ratio = otel_config
246                .traces_sampler_arg
247                .as_ref()
248                .and_then(|r| r.parse::<f64>().ok());
249            if let Some(r) = ratio {
250                Sampler::TraceIdRatioBased(r)
251            } else {
252                eprintln!("Missing or invalid OTEL_TRACES_SAMPLER_ARG value. Falling back to default: 1.0");
253                Sampler::TraceIdRatioBased(1.0)
254            }
255        }
256        Some("parentbased_always_on") => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
257        Some("parentbased_always_off") => Sampler::ParentBased(Box::new(Sampler::AlwaysOff)),
258        Some("parentbased_traceidratio") => {
259            let ratio = otel_config
260                .traces_sampler_arg
261                .as_ref()
262                .and_then(|r| r.parse::<f64>().ok());
263            if let Some(r) = ratio {
264                Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(r)))
265            } else {
266                eprintln!("Missing or invalid OTEL_TRACES_SAMPLER_ARG value. Falling back to default: 1.0");
267                Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(1.0)))
268            }
269        }
270        Some(s) => {
271            eprintln!("Unrecognised or unimplemented OTEL_TRACES_SAMPLER value: {s}. Falling back to default: parentbased_always_on");
272            Sampler::ParentBased(Box::new(Sampler::AlwaysOn))
273        }
274        None => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
275    };
276
277    let mut batch_builder = BatchConfigBuilder::default();
278    if let Some(max_batch_queue_size) = otel_config.max_batch_queue_size {
279        batch_builder = batch_builder.with_max_queue_size(max_batch_queue_size);
280    }
281    if let Some(concurrent_exports) = otel_config.concurrent_exports {
282        batch_builder = batch_builder.with_max_concurrent_exports(concurrent_exports);
283    }
284    let batch_config = batch_builder.build();
285
286    let processor =
287        opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor::builder(
288            exporter,
289            opentelemetry_sdk::runtime::Tokio,
290        )
291        .with_batch_config(batch_config)
292        .build();
293
294    let tracer = opentelemetry_sdk::trace::SdkTracerProvider::builder()
295        .with_sampler(sampler)
296        .with_resource(
297            opentelemetry_sdk::Resource::builder_empty()
298                .with_detector(Box::new(
299                    opentelemetry_sdk::resource::EnvResourceDetector::new(),
300                ))
301                .with_attribute(opentelemetry::KeyValue::new(
302                    "service.name",
303                    service_name.to_string(),
304                ))
305                .build(),
306        )
307        .with_span_processor(processor)
308        .build()
309        .tracer("wasmcloud-tracing");
310
311    Ok(OpenTelemetryLayer::new(tracer).with_filter(trace_level_filter))
312}
313
314#[cfg(feature = "otel")]
315fn get_otel_logging_layer<S>(
316    service_name: Arc<str>,
317    otel_config: &OtelConfig,
318    log_level_override: Option<&Level>,
319) -> anyhow::Result<impl tracing_subscriber::Layer<S>>
320where
321    S: Subscriber,
322    S: for<'a> tracing_subscriber::registry::LookupSpan<'a>,
323{
324    use opentelemetry_otlp::WithHttpConfig;
325
326    let exporter = match otel_config.protocol {
327        OtelProtocol::Http => {
328            let client = crate::get_http_client(otel_config)
329                .context("failed to get an http client for otel logging exporter")?;
330            opentelemetry_otlp::LogExporter::builder()
331                .with_http()
332                .with_http_client(client)
333                .with_endpoint(otel_config.logs_endpoint())
334                .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
335                .build()
336                .context("failed to create OTEL http log exporter")?
337        }
338        OtelProtocol::Grpc => {
339            // TODO(joonas): Configure tonic::transport::ClientTlsConfig via .with_tls_config(...), passing in additional certificates.
340            opentelemetry_otlp::LogExporter::builder()
341                .with_tonic()
342                .with_endpoint(otel_config.logs_endpoint())
343                .build()
344                .context("failed to create OTEL http log exporter")?
345        }
346    };
347
348    let processor =
349        opentelemetry_sdk::logs::log_processor_with_async_runtime::BatchLogProcessor::builder(
350            exporter,
351            opentelemetry_sdk::runtime::Tokio,
352        )
353        .build();
354
355    let log_provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
356        .with_resource(
357            opentelemetry_sdk::Resource::builder_empty()
358                .with_detector(Box::new(
359                    opentelemetry_sdk::resource::EnvResourceDetector::new(),
360                ))
361                .with_attribute(opentelemetry::KeyValue::new(
362                    "service.name",
363                    service_name.to_string(),
364                ))
365                .build(),
366        )
367        .with_log_processor(processor)
368        .build();
369
370    // Prevent the exporter/provider from being dropped
371    LOG_PROVIDER
372        .set(log_provider)
373        .map_err(|_| anyhow::anyhow!("Logger provider already initialized"))?;
374
375    let log_layer = opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
376        LOG_PROVIDER.get().unwrap(),
377    )
378    .with_filter(get_log_level_filter(log_level_override));
379
380    Ok(log_layer)
381}
382
383#[cfg(feature = "otel")]
384fn get_trace_level_filter(trace_level_override: Option<&Level>) -> EnvFilter {
385    if let Some(trace_level) = trace_level_override {
386        let level = wasi_level_to_tracing_level(trace_level);
387        EnvFilter::default().add_directive(level.into())
388    } else {
389        EnvFilter::default().add_directive(LevelFilter::DEBUG.into())
390    }
391}
392
393fn get_log_level_filter(log_level_override: Option<&Level>) -> EnvFilter {
394    if let Some(log_level) = log_level_override {
395        let level = wasi_level_to_tracing_level(log_level);
396        // SAFETY: We can unwrap here because we control all inputs
397        let mut filter = EnvFilter::builder()
398            .with_default_directive(level.into())
399            .parse("")
400            .unwrap()
401            .add_directive("async_nats=info".parse().unwrap())
402            .add_directive("cranelift_codegen=warn".parse().unwrap())
403            .add_directive("hyper=info".parse().unwrap())
404            .add_directive("oci_client=info".parse().unwrap());
405
406        // Allow RUST_LOG to override the other directives
407        if let Ok(rust_log) = env::var("RUST_LOG") {
408            match rust_log
409                .split(',')
410                .map(str::parse)
411                .collect::<Result<Vec<_>, _>>()
412            {
413                Ok(directives) => {
414                    for directive in directives {
415                        filter = filter.add_directive(directive);
416                    }
417                }
418                Err(err) => {
419                    eprintln!("ERROR: Ignoring invalid RUST_LOG directive: {err}");
420                }
421            }
422        }
423
424        filter
425    } else {
426        EnvFilter::default().add_directive(LevelFilter::INFO.into())
427    }
428}
429
430fn wasi_level_to_tracing_level(level: &Level) -> LevelFilter {
431    match level {
432        Level::Error | Level::Critical => LevelFilter::ERROR,
433        Level::Warn => LevelFilter::WARN,
434        Level::Info => LevelFilter::INFO,
435        Level::Debug => LevelFilter::DEBUG,
436        Level::Trace => LevelFilter::TRACE,
437    }
438}