opentelemetry_nats/
lib.rs

1use std::sync::OnceLock;
2
3use async_nats::header::{HeaderMap, HeaderValue};
4use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
5use opentelemetry_sdk::propagation::TraceContextPropagator;
6use tracing::span::Span;
7use tracing_opentelemetry::OpenTelemetrySpanExt;
8
9static EMPTY_HEADERS: OnceLock<HeaderMap> = OnceLock::new();
10
11fn empty_headers() -> &'static HeaderMap {
12    EMPTY_HEADERS.get_or_init(HeaderMap::new)
13}
14
15/// A convenience type that wraps a NATS [`HeaderMap`] and implements the [`Extractor`] trait
16#[derive(Debug)]
17pub struct NatsHeaderExtractor<'a> {
18    inner: &'a HeaderMap,
19}
20
21impl<'a> NatsHeaderExtractor<'a> {
22    /// Creates a new extractor using the given [`HeaderMap`]
23    #[must_use]
24    pub fn new(headers: &'a HeaderMap) -> Self {
25        NatsHeaderExtractor { inner: headers }
26    }
27
28    /// Creates a new extractor using the given message
29    pub fn new_from_message(msg: &'a async_nats::Message) -> Self {
30        let inner = msg.headers.as_ref().unwrap_or_else(|| empty_headers());
31        NatsHeaderExtractor { inner }
32    }
33}
34
35impl Extractor for NatsHeaderExtractor<'_> {
36    fn get(&self, key: &str) -> Option<&str> {
37        self.inner.get(key).map(HeaderValue::as_str)
38    }
39
40    fn keys(&self) -> Vec<&str> {
41        self.inner
42            .iter()
43            // The underlying type is a string and this should never fail, but we unwrap to an empty string anyway
44            .map(|(k, _)| std::str::from_utf8(k.as_ref()).unwrap_or_default())
45            .collect()
46    }
47}
48
49impl<'a> AsRef<HeaderMap> for NatsHeaderExtractor<'a> {
50    fn as_ref(&self) -> &'a HeaderMap {
51        self.inner
52    }
53}
54
55/// A convenience type that wraps a NATS [`HeaderMap`] and implements the [`Injector`] trait
56#[derive(Debug, Default)]
57pub struct NatsHeaderInjector {
58    inner: HeaderMap,
59}
60
61impl NatsHeaderInjector {
62    /// Creates a new injector using the given [`HeaderMap`]
63    #[must_use]
64    pub fn new(headers: HeaderMap) -> Self {
65        NatsHeaderInjector { inner: headers }
66    }
67
68    /// Convenience constructor that returns a new injector with the current span context already
69    /// injected into the given header map
70    #[must_use]
71    pub fn new_with_span(headers: HeaderMap) -> Self {
72        let mut header_map = Self::new(headers);
73        header_map.inject_context();
74        header_map
75    }
76
77    /// Convenience constructor that returns a new injector with the current span context already
78    /// injected into a default [`HeaderMap`]
79    #[must_use]
80    pub fn default_with_span() -> Self {
81        let mut header_map = Self::default();
82        header_map.inject_context();
83        header_map
84    }
85
86    /// Injects the current context from the span into the headers
87    pub fn inject_context(&mut self) {
88        let ctx_propagator = TraceContextPropagator::new();
89        ctx_propagator.inject_context(&Span::current().context(), self);
90    }
91}
92
93impl Injector for NatsHeaderInjector {
94    fn set(&mut self, key: &str, value: String) {
95        self.inner.insert(key, value.as_ref());
96    }
97}
98
99impl AsRef<HeaderMap> for NatsHeaderInjector {
100    fn as_ref(&self) -> &HeaderMap {
101        &self.inner
102    }
103}
104
105impl From<HeaderMap> for NatsHeaderInjector {
106    fn from(headers: HeaderMap) -> Self {
107        NatsHeaderInjector::new(headers)
108    }
109}
110
111impl From<NatsHeaderInjector> for HeaderMap {
112    fn from(inj: NatsHeaderInjector) -> Self {
113        inj.inner
114    }
115}
116
117/// A convenience function that will extract headers from a message and set the parent span for the
118/// current tracing Span.  If you want to do something more advanced, use the
119/// [`NatsHeaderExtractor`] type directly
120pub fn attach_span_context(msg: &async_nats::Message) {
121    // If we extract and there are no OTEL headers, setting the parent will orphan the current span
122    // hierarchy. Checking that there are headers is a heuristic to avoid this
123    if let Some(ref headers) = msg.headers {
124        if headers.iter().len() > 0 {
125            let extractor = NatsHeaderExtractor::new(headers);
126            let ctx_propagator = TraceContextPropagator::new();
127            let parent_ctx = ctx_propagator.extract(&extractor);
128            Span::current().set_parent(parent_ctx);
129        }
130    }
131}