opentelemetry_nats/
lib.rs1use std::sync::OnceLock;
2
3use async_nats::header::{HeaderMap, HeaderValue};
4use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
5use opentelemetry_sdk::propagation::TraceContextPropagator;
6use tracing::span::Span;
7use tracing_opentelemetry::OpenTelemetrySpanExt;
8
9static EMPTY_HEADERS: OnceLock<HeaderMap> = OnceLock::new();
10
11fn empty_headers() -> &'static HeaderMap {
12 EMPTY_HEADERS.get_or_init(HeaderMap::new)
13}
14
15#[derive(Debug)]
17pub struct NatsHeaderExtractor<'a> {
18 inner: &'a HeaderMap,
19}
20
21impl<'a> NatsHeaderExtractor<'a> {
22 #[must_use]
24 pub fn new(headers: &'a HeaderMap) -> Self {
25 NatsHeaderExtractor { inner: headers }
26 }
27
28 pub fn new_from_message(msg: &'a async_nats::Message) -> Self {
30 let inner = msg.headers.as_ref().unwrap_or_else(|| empty_headers());
31 NatsHeaderExtractor { inner }
32 }
33}
34
35impl Extractor for NatsHeaderExtractor<'_> {
36 fn get(&self, key: &str) -> Option<&str> {
37 self.inner.get(key).map(HeaderValue::as_str)
38 }
39
40 fn keys(&self) -> Vec<&str> {
41 self.inner
42 .iter()
43 .map(|(k, _)| std::str::from_utf8(k.as_ref()).unwrap_or_default())
45 .collect()
46 }
47}
48
49impl<'a> AsRef<HeaderMap> for NatsHeaderExtractor<'a> {
50 fn as_ref(&self) -> &'a HeaderMap {
51 self.inner
52 }
53}
54
55#[derive(Debug, Default)]
57pub struct NatsHeaderInjector {
58 inner: HeaderMap,
59}
60
61impl NatsHeaderInjector {
62 #[must_use]
64 pub fn new(headers: HeaderMap) -> Self {
65 NatsHeaderInjector { inner: headers }
66 }
67
68 #[must_use]
71 pub fn new_with_span(headers: HeaderMap) -> Self {
72 let mut header_map = Self::new(headers);
73 header_map.inject_context();
74 header_map
75 }
76
77 #[must_use]
80 pub fn default_with_span() -> Self {
81 let mut header_map = Self::default();
82 header_map.inject_context();
83 header_map
84 }
85
86 pub fn inject_context(&mut self) {
88 let ctx_propagator = TraceContextPropagator::new();
89 ctx_propagator.inject_context(&Span::current().context(), self);
90 }
91}
92
93impl Injector for NatsHeaderInjector {
94 fn set(&mut self, key: &str, value: String) {
95 self.inner.insert(key, value.as_ref());
96 }
97}
98
99impl AsRef<HeaderMap> for NatsHeaderInjector {
100 fn as_ref(&self) -> &HeaderMap {
101 &self.inner
102 }
103}
104
105impl From<HeaderMap> for NatsHeaderInjector {
106 fn from(headers: HeaderMap) -> Self {
107 NatsHeaderInjector::new(headers)
108 }
109}
110
111impl From<NatsHeaderInjector> for HeaderMap {
112 fn from(inj: NatsHeaderInjector) -> Self {
113 inj.inner
114 }
115}
116
117pub fn attach_span_context(msg: &async_nats::Message) {
121 if let Some(ref headers) = msg.headers {
124 if headers.iter().len() > 0 {
125 let extractor = NatsHeaderExtractor::new(headers);
126 let ctx_propagator = TraceContextPropagator::new();
127 let parent_ctx = ctx_propagator.extract(&extractor);
128 Span::current().set_parent(parent_ctx);
129 }
130 }
131}