1use std::env;
2use std::fs::File;
3use std::io::{BufWriter, IsTerminal};
4use std::path::Path;
5#[cfg(feature = "otel")]
6use std::sync::Arc;
7
8#[cfg(feature = "otel")]
9use anyhow::Context as _;
10#[cfg(feature = "otel")]
11use opentelemetry_otlp::WithExportConfig;
12use tracing::{Event, Subscriber};
13use tracing_flame::FlameLayer;
14use tracing_subscriber::filter::LevelFilter;
15use tracing_subscriber::fmt::format::{DefaultFields, Format, Full, Json, JsonFields, Writer};
16use tracing_subscriber::fmt::time::SystemTime;
17use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields};
18use tracing_subscriber::layer::SubscriberExt;
19use tracing_subscriber::registry::LookupSpan;
20use tracing_subscriber::EnvFilter;
21#[cfg(feature = "otel")]
22use tracing_subscriber::Layer;
23use wasmcloud_core::logging::Level;
24use wasmcloud_core::OtelConfig;
25#[cfg(feature = "otel")]
26use wasmcloud_core::OtelProtocol;
27
28#[cfg(feature = "otel")]
29static LOG_PROVIDER: once_cell::sync::OnceCell<opentelemetry_sdk::logs::SdkLoggerProvider> =
30 once_cell::sync::OnceCell::new();
31
32enum JsonOrNot {
35 Not(Format<Full, SystemTime>),
36 Json(Format<Json, SystemTime>),
37}
38
39impl<S, N> FormatEvent<S, N> for JsonOrNot
40where
41 S: Subscriber + for<'lookup> LookupSpan<'lookup>,
42 N: for<'writer> FormatFields<'writer> + 'static,
43{
44 fn format_event(
45 &self,
46 ctx: &FmtContext<'_, S, N>,
47 writer: Writer<'_>,
48 event: &Event<'_>,
49 ) -> std::fmt::Result
50 where
51 S: Subscriber + for<'a> LookupSpan<'a>,
52 {
53 match self {
54 JsonOrNot::Not(f) => f.format_event(ctx, writer, event),
55 JsonOrNot::Json(f) => f.format_event(ctx, writer, event),
56 }
57 }
58}
59
60pub struct FlushGuard {
62 _stderr: tracing_appender::non_blocking::WorkerGuard,
63 _flame: Option<tracing_flame::FlushGuard<BufWriter<File>>>,
64}
65
66#[cfg(not(feature = "otel"))]
75pub fn configure_tracing(
76 _: &str,
77 _: &OtelConfig,
78 use_structured_logging: bool,
79 flame_graph: Option<impl AsRef<Path>>,
80 log_level_override: Option<&Level>,
81) -> anyhow::Result<(tracing::Dispatch, FlushGuard)> {
82 let flame = flame_graph.map(FlameLayer::with_file).transpose()?;
83 let (flame, flame_guard) = flame.map(|(l, g)| (Some(l), Some(g))).unwrap_or_default();
84 let reg = tracing_subscriber::Registry::default()
85 .with(get_log_level_filter(log_level_override))
86 .with(flame);
87 let stderr = std::io::stderr();
88 let ansi = stderr.is_terminal();
89 let (stderr, stderr_guard) = tracing_appender::non_blocking(stderr);
90 let fmt = tracing_subscriber::fmt::layer()
91 .with_writer(stderr)
92 .with_ansi(ansi);
93
94 let dispatch = if use_structured_logging {
95 reg.with(
96 fmt.event_format(JsonOrNot::Json(Format::default().json()))
97 .fmt_fields(JsonFields::new()),
98 )
99 .into()
100 } else {
101 reg.with(
102 fmt.event_format(JsonOrNot::Not(Format::default()))
103 .fmt_fields(DefaultFields::new()),
104 )
105 .into()
106 };
107
108 Ok((
109 dispatch,
110 FlushGuard {
111 _stderr: stderr_guard,
112 _flame: flame_guard,
113 },
114 ))
115}
116
117#[cfg(feature = "otel")]
127pub fn configure_tracing(
128 service_name: &str,
129 otel_config: &OtelConfig,
130 use_structured_logging: bool,
131 flame_graph: Option<impl AsRef<Path>>,
132 log_level_override: Option<&Level>,
133 trace_level_override: Option<&Level>,
134) -> anyhow::Result<(tracing::Dispatch, FlushGuard)> {
135 let service_name = Arc::from(service_name);
136
137 let log_level_filter = get_log_level_filter(log_level_override);
138 let traces = otel_config
139 .traces_enabled()
140 .then(|| {
141 get_otel_tracing_layer(
142 Arc::clone(&service_name),
143 otel_config,
144 get_trace_level_filter(trace_level_override),
145 )
146 })
147 .transpose()?;
148 let logs = otel_config
149 .logs_enabled()
150 .then(|| get_otel_logging_layer(Arc::clone(&service_name), otel_config, log_level_override))
151 .transpose()?;
152 let flame = flame_graph.map(FlameLayer::with_file).transpose()?;
153 let (flame, flame_guard) = flame
154 .map(|(l, g)| {
155 (
156 Some(l.with_filter(get_trace_level_filter(trace_level_override))),
157 Some(g),
158 )
159 })
160 .unwrap_or_default();
161 let registry = tracing_subscriber::Registry::default()
162 .with(get_log_level_filter(log_level_override))
163 .with(traces)
164 .with(logs)
165 .with(flame);
166 let stderr = std::io::stderr();
167 let ansi = stderr.is_terminal();
168 let (stderr, stderr_guard) = tracing_appender::non_blocking(stderr);
169 let fmt = tracing_subscriber::fmt::layer()
170 .with_writer(stderr)
171 .with_ansi(ansi);
172
173 let dispatch = if use_structured_logging {
174 registry
175 .with(
176 fmt.event_format(JsonOrNot::Json(Format::default().json()))
177 .fmt_fields(JsonFields::new())
178 .with_filter(log_level_filter),
179 )
180 .into()
181 } else {
182 registry
183 .with(
184 fmt.event_format(JsonOrNot::Not(Format::default()))
185 .fmt_fields(DefaultFields::new())
186 .with_filter(log_level_filter),
187 )
188 .into()
189 };
190
191 Ok((
192 dispatch,
193 FlushGuard {
194 _stderr: stderr_guard,
195 _flame: flame_guard,
196 },
197 ))
198}
199
200#[cfg(feature = "otel")]
201fn get_otel_tracing_layer<S>(
202 service_name: Arc<str>,
203 otel_config: &OtelConfig,
204 trace_level_filter: EnvFilter,
205) -> anyhow::Result<impl tracing_subscriber::Layer<S>>
206where
207 S: Subscriber,
208 S: for<'a> tracing_subscriber::registry::LookupSpan<'a>,
209{
210 use opentelemetry::trace::TracerProvider as _;
211 use opentelemetry_otlp::WithHttpConfig;
212 use opentelemetry_sdk::trace::{BatchConfigBuilder, Sampler};
213 use tracing_opentelemetry::OpenTelemetryLayer;
214
215 let exporter = match otel_config.protocol {
216 OtelProtocol::Http => {
217 let client = crate::get_http_client(otel_config)
218 .context("failed to get an http client for otel tracing exporter")?;
219 opentelemetry_otlp::SpanExporter::builder()
220 .with_http()
221 .with_http_client(client)
222 .with_endpoint(otel_config.traces_endpoint())
223 .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
224 .build()
225 .context("failed to build OTEL span exporter")?
226 }
227 OtelProtocol::Grpc => {
228 opentelemetry_otlp::SpanExporter::builder()
230 .with_tonic()
231 .with_endpoint(otel_config.traces_endpoint())
232 .build()
233 .context("failed to build OTEL span exporter")?
234 }
235 };
236
237 let sampler = match otel_config.traces_sampler.as_deref() {
242 Some("always_on") => Sampler::AlwaysOn,
243 Some("always_off") => Sampler::AlwaysOff,
244 Some("traceidratio") => {
245 let ratio = otel_config
246 .traces_sampler_arg
247 .as_ref()
248 .and_then(|r| r.parse::<f64>().ok());
249 if let Some(r) = ratio {
250 Sampler::TraceIdRatioBased(r)
251 } else {
252 eprintln!("Missing or invalid OTEL_TRACES_SAMPLER_ARG value. Falling back to default: 1.0");
253 Sampler::TraceIdRatioBased(1.0)
254 }
255 }
256 Some("parentbased_always_on") => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
257 Some("parentbased_always_off") => Sampler::ParentBased(Box::new(Sampler::AlwaysOff)),
258 Some("parentbased_traceidratio") => {
259 let ratio = otel_config
260 .traces_sampler_arg
261 .as_ref()
262 .and_then(|r| r.parse::<f64>().ok());
263 if let Some(r) = ratio {
264 Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(r)))
265 } else {
266 eprintln!("Missing or invalid OTEL_TRACES_SAMPLER_ARG value. Falling back to default: 1.0");
267 Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(1.0)))
268 }
269 }
270 Some(s) => {
271 eprintln!("Unrecognised or unimplemented OTEL_TRACES_SAMPLER value: {s}. Falling back to default: parentbased_always_on");
272 Sampler::ParentBased(Box::new(Sampler::AlwaysOn))
273 }
274 None => Sampler::ParentBased(Box::new(Sampler::AlwaysOn)),
275 };
276
277 let mut batch_builder = BatchConfigBuilder::default();
278 if let Some(max_batch_queue_size) = otel_config.max_batch_queue_size {
279 batch_builder = batch_builder.with_max_queue_size(max_batch_queue_size);
280 }
281 if let Some(concurrent_exports) = otel_config.concurrent_exports {
282 batch_builder = batch_builder.with_max_concurrent_exports(concurrent_exports);
283 }
284 let batch_config = batch_builder.build();
285
286 let processor =
287 opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor::builder(
288 exporter,
289 opentelemetry_sdk::runtime::Tokio,
290 )
291 .with_batch_config(batch_config)
292 .build();
293
294 let tracer = opentelemetry_sdk::trace::SdkTracerProvider::builder()
295 .with_sampler(sampler)
296 .with_resource(
297 opentelemetry_sdk::Resource::builder_empty()
298 .with_detector(Box::new(
299 opentelemetry_sdk::resource::EnvResourceDetector::new(),
300 ))
301 .with_attribute(opentelemetry::KeyValue::new(
302 "service.name",
303 service_name.to_string(),
304 ))
305 .build(),
306 )
307 .with_span_processor(processor)
308 .build()
309 .tracer("wasmcloud-tracing");
310
311 Ok(OpenTelemetryLayer::new(tracer).with_filter(trace_level_filter))
312}
313
314#[cfg(feature = "otel")]
315fn get_otel_logging_layer<S>(
316 service_name: Arc<str>,
317 otel_config: &OtelConfig,
318 log_level_override: Option<&Level>,
319) -> anyhow::Result<impl tracing_subscriber::Layer<S>>
320where
321 S: Subscriber,
322 S: for<'a> tracing_subscriber::registry::LookupSpan<'a>,
323{
324 use opentelemetry_otlp::WithHttpConfig;
325
326 let exporter = match otel_config.protocol {
327 OtelProtocol::Http => {
328 let client = crate::get_http_client(otel_config)
329 .context("failed to get an http client for otel logging exporter")?;
330 opentelemetry_otlp::LogExporter::builder()
331 .with_http()
332 .with_http_client(client)
333 .with_endpoint(otel_config.logs_endpoint())
334 .with_protocol(opentelemetry_otlp::Protocol::HttpBinary)
335 .build()
336 .context("failed to create OTEL http log exporter")?
337 }
338 OtelProtocol::Grpc => {
339 opentelemetry_otlp::LogExporter::builder()
341 .with_tonic()
342 .with_endpoint(otel_config.logs_endpoint())
343 .build()
344 .context("failed to create OTEL http log exporter")?
345 }
346 };
347
348 let processor =
349 opentelemetry_sdk::logs::log_processor_with_async_runtime::BatchLogProcessor::builder(
350 exporter,
351 opentelemetry_sdk::runtime::Tokio,
352 )
353 .build();
354
355 let log_provider = opentelemetry_sdk::logs::SdkLoggerProvider::builder()
356 .with_resource(
357 opentelemetry_sdk::Resource::builder_empty()
358 .with_detector(Box::new(
359 opentelemetry_sdk::resource::EnvResourceDetector::new(),
360 ))
361 .with_attribute(opentelemetry::KeyValue::new(
362 "service.name",
363 service_name.to_string(),
364 ))
365 .build(),
366 )
367 .with_log_processor(processor)
368 .build();
369
370 LOG_PROVIDER
372 .set(log_provider)
373 .map_err(|_| anyhow::anyhow!("Logger provider already initialized"))?;
374
375 let log_layer = opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
376 LOG_PROVIDER.get().unwrap(),
377 )
378 .with_filter(get_log_level_filter(log_level_override));
379
380 Ok(log_layer)
381}
382
383#[cfg(feature = "otel")]
384fn get_trace_level_filter(trace_level_override: Option<&Level>) -> EnvFilter {
385 if let Some(trace_level) = trace_level_override {
386 let level = wasi_level_to_tracing_level(trace_level);
387 EnvFilter::default().add_directive(level.into())
388 } else {
389 EnvFilter::default().add_directive(LevelFilter::DEBUG.into())
390 }
391}
392
393fn get_log_level_filter(log_level_override: Option<&Level>) -> EnvFilter {
394 if let Some(log_level) = log_level_override {
395 let level = wasi_level_to_tracing_level(log_level);
396 let mut filter = EnvFilter::builder()
398 .with_default_directive(level.into())
399 .parse("")
400 .unwrap()
401 .add_directive("async_nats=info".parse().unwrap())
402 .add_directive("cranelift_codegen=warn".parse().unwrap())
403 .add_directive("hyper=info".parse().unwrap())
404 .add_directive("oci_client=info".parse().unwrap());
405
406 if let Ok(rust_log) = env::var("RUST_LOG") {
408 match rust_log
409 .split(',')
410 .map(str::parse)
411 .collect::<Result<Vec<_>, _>>()
412 {
413 Ok(directives) => {
414 for directive in directives {
415 filter = filter.add_directive(directive);
416 }
417 }
418 Err(err) => {
419 eprintln!("ERROR: Ignoring invalid RUST_LOG directive: {err}");
420 }
421 }
422 }
423
424 filter
425 } else {
426 EnvFilter::default().add_directive(LevelFilter::INFO.into())
427 }
428}
429
430fn wasi_level_to_tracing_level(level: &Level) -> LevelFilter {
431 match level {
432 Level::Error | Level::Critical => LevelFilter::ERROR,
433 Level::Warn => LevelFilter::WARN,
434 Level::Info => LevelFilter::INFO,
435 Level::Debug => LevelFilter::DEBUG,
436 Level::Trace => LevelFilter::TRACE,
437 }
438}