opentelemetry_sdk/metrics/
periodic_reader_with_async_runtime.rs

1use std::{
2    env, fmt, mem,
3    sync::{Arc, Mutex, Weak},
4    time::Duration,
5};
6
7use futures_channel::{mpsc, oneshot};
8use futures_util::{
9    future::{self, Either},
10    pin_mut,
11    stream::{self, FusedStream},
12    StreamExt,
13};
14use opentelemetry::{otel_debug, otel_error};
15
16use crate::runtime::Runtime;
17use crate::{
18    error::{OTelSdkError, OTelSdkResult},
19    metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
20    Resource,
21};
22
23use super::{data::ResourceMetrics, reader::MetricReader, InstrumentKind, Pipeline};
24
25const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
26const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
27
28const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
29const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT";
30
31/// Configuration options for [PeriodicReader].
32///
33/// A periodic reader is a [MetricReader] that collects and exports metric data
34/// to the exporter at a defined interval.
35///
36/// By default, the returned [MetricReader] will collect and export data every
37/// 60 seconds, and will cancel export attempts that exceed 30 seconds. The
38/// export time is not counted towards the interval between attempts.
39///
40/// The [collect] method of the returned [MetricReader] continues to gather and
41/// return metric data to the user. It will not automatically send that data to
42/// the exporter outside of the predefined interval.
43///
44/// [collect]: MetricReader::collect
45#[derive(Debug)]
46pub struct PeriodicReaderBuilder<E, RT> {
47    interval: Duration,
48    timeout: Duration,
49    exporter: E,
50    runtime: RT,
51}
52
53impl<E, RT> PeriodicReaderBuilder<E, RT>
54where
55    E: PushMetricExporter,
56    RT: Runtime,
57{
58    fn new(exporter: E, runtime: RT) -> Self {
59        let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
60            .ok()
61            .and_then(|v| v.parse().map(Duration::from_millis).ok())
62            .unwrap_or(DEFAULT_INTERVAL);
63        let timeout = env::var(METRIC_EXPORT_TIMEOUT_NAME)
64            .ok()
65            .and_then(|v| v.parse().map(Duration::from_millis).ok())
66            .unwrap_or(DEFAULT_TIMEOUT);
67
68        PeriodicReaderBuilder {
69            interval,
70            timeout,
71            exporter,
72            runtime,
73        }
74    }
75
76    /// Configures the intervening time between exports for a [PeriodicReader].
77    ///
78    /// This option overrides any value set for the `OTEL_METRIC_EXPORT_INTERVAL`
79    /// environment variable.
80    ///
81    /// If this option is not used or `interval` is equal to zero, 60 seconds is
82    /// used as the default.
83    pub fn with_interval(mut self, interval: Duration) -> Self {
84        if !interval.is_zero() {
85            self.interval = interval;
86        }
87        self
88    }
89
90    /// Configures the time a [PeriodicReader] waits for an export to complete
91    /// before canceling it.
92    ///
93    /// This option overrides any value set for the `OTEL_METRIC_EXPORT_TIMEOUT`
94    /// environment variable.
95    ///
96    /// If this option is not used or `timeout` is equal to zero, 30 seconds is used
97    /// as the default.
98    pub fn with_timeout(mut self, timeout: Duration) -> Self {
99        if !timeout.is_zero() {
100            self.timeout = timeout;
101        }
102        self
103    }
104
105    /// Create a [PeriodicReader] with the given config.
106    pub fn build(self) -> PeriodicReader {
107        let (message_sender, message_receiver) = mpsc::channel(256);
108
109        let worker = move |reader: &PeriodicReader| {
110            let runtime = self.runtime.clone();
111            let reader = reader.clone();
112            self.runtime.spawn(Box::pin(async move {
113                let ticker = runtime
114                    .interval(self.interval)
115                    .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
116                    .map(|_| Message::Export);
117                let messages = Box::pin(stream::select(message_receiver, ticker));
118                PeriodicReaderWorker {
119                    reader,
120                    timeout: self.timeout,
121                    runtime,
122                    rm: ResourceMetrics {
123                        resource: Resource::empty(),
124                        scope_metrics: Vec::new(),
125                    },
126                }
127                .run(messages)
128                .await
129            }));
130        };
131
132        otel_debug!(
133            name: "PeriodicReader.BuildCompleted",
134            message = "Periodic reader built.",
135            interval_in_secs = self.interval.as_secs(),
136            temporality = format!("{:?}", self.exporter.temporality()),
137        );
138
139        PeriodicReader {
140            exporter: Arc::new(self.exporter),
141            inner: Arc::new(Mutex::new(PeriodicReaderInner {
142                message_sender,
143                is_shutdown: false,
144                sdk_producer_or_worker: ProducerOrWorker::Worker(Box::new(worker)),
145            })),
146        }
147    }
148}
149
150/// A [MetricReader] that continuously collects and exports metric data at a set
151/// interval.
152///
153/// By default it will collect and export data every 60 seconds, and will cancel
154/// export attempts that exceed 30 seconds. The export time is not counted
155/// towards the interval between attempts.
156///
157/// The [collect] method of the returned continues to gather and
158/// return metric data to the user. It will not automatically send that data to
159/// the exporter outside of the predefined interval.
160///
161/// The [runtime] can be selected based on feature flags set for this crate.
162///
163/// The exporter can be any exporter that implements [PushMetricExporter] such
164/// as [opentelemetry-otlp].
165///
166/// [collect]: MetricReader::collect
167/// [runtime]: crate::runtime
168/// [opentelemetry-otlp]: https://docs.rs/opentelemetry-otlp/latest/opentelemetry_otlp/
169///
170/// # Example
171///
172/// ```no_run
173/// use opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader;
174/// # fn example<E, R>(get_exporter: impl Fn() -> E, get_runtime: impl Fn() -> R)
175/// # where
176/// #     E: opentelemetry_sdk::metrics::exporter::PushMetricExporter,
177/// #     R: opentelemetry_sdk::runtime::Runtime,
178/// # {
179///
180/// let exporter = get_exporter(); // set up a push exporter like OTLP
181/// let runtime = get_runtime(); // select runtime: e.g. opentelemetry_sdk:runtime::Tokio
182///
183/// let reader = PeriodicReader::builder(exporter, runtime).build();
184/// # drop(reader);
185/// # }
186/// ```
187#[derive(Clone)]
188pub struct PeriodicReader {
189    exporter: Arc<dyn PushMetricExporter>,
190    inner: Arc<Mutex<PeriodicReaderInner>>,
191}
192
193impl PeriodicReader {
194    /// Configuration options for a periodic reader
195    pub fn builder<E, RT>(exporter: E, runtime: RT) -> PeriodicReaderBuilder<E, RT>
196    where
197        E: PushMetricExporter,
198        RT: Runtime,
199    {
200        PeriodicReaderBuilder::new(exporter, runtime)
201    }
202}
203
204impl fmt::Debug for PeriodicReader {
205    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206        f.debug_struct("PeriodicReader").finish()
207    }
208}
209
210struct PeriodicReaderInner {
211    message_sender: mpsc::Sender<Message>,
212    is_shutdown: bool,
213    sdk_producer_or_worker: ProducerOrWorker,
214}
215
216#[derive(Debug)]
217enum Message {
218    Export,
219    Flush(oneshot::Sender<OTelSdkResult>),
220    Shutdown(oneshot::Sender<OTelSdkResult>),
221}
222
223enum ProducerOrWorker {
224    Producer(Weak<dyn SdkProducer>),
225    Worker(Box<dyn FnOnce(&PeriodicReader) + Send + Sync>),
226}
227
228struct PeriodicReaderWorker<RT: Runtime> {
229    reader: PeriodicReader,
230    timeout: Duration,
231    runtime: RT,
232    rm: ResourceMetrics,
233}
234
235impl<RT: Runtime> PeriodicReaderWorker<RT> {
236    async fn collect_and_export(&mut self) -> OTelSdkResult {
237        self.reader
238            .collect(&mut self.rm)
239            .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
240        if self.rm.scope_metrics.is_empty() {
241            otel_debug!(
242                name: "PeriodicReaderWorker.NoMetricsToExport",
243            );
244            // No metrics to export.
245            return Ok(());
246        }
247
248        otel_debug!(
249            name: "PeriodicReaderWorker.InvokeExporter",
250            message = "Calling exporter's export method with collected metrics.",
251            count = self.rm.scope_metrics.len(),
252        );
253        let export = self.reader.exporter.export(&mut self.rm);
254        let timeout = self.runtime.delay(self.timeout);
255        pin_mut!(export);
256        pin_mut!(timeout);
257
258        match future::select(export, timeout).await {
259            Either::Left((res, _)) => {
260                res // return the status of export.
261            }
262            Either::Right(_) => Err(OTelSdkError::Timeout(self.timeout)),
263        }
264    }
265
266    async fn process_message(&mut self, message: Message) -> bool {
267        match message {
268            Message::Export => {
269                otel_debug!(
270                    name: "PeriodicReader.ExportTriggered",
271                    message = "Export message received.",
272                );
273                if let Err(err) = self.collect_and_export().await {
274                    otel_error!(
275                        name: "PeriodicReader.ExportFailed",
276                        message = "Failed to export metrics",
277                        reason = format!("{}", err));
278                }
279            }
280            Message::Flush(ch) => {
281                otel_debug!(
282                    name: "PeriodicReader.ForceFlushCalled",
283                    message = "Flush message received.",
284                );
285                let res = self.collect_and_export().await;
286                if let Err(send_error) = ch.send(res) {
287                    otel_debug!(
288                        name: "PeriodicReader.Flush.SendResultError",
289                        message = "Failed to send flush result.",
290                        reason = format!("{:?}", send_error),
291                    );
292                }
293            }
294            Message::Shutdown(ch) => {
295                otel_debug!(
296                    name: "PeriodicReader.ShutdownCalled",
297                    message = "Shutdown message received",
298                );
299                let res = self.collect_and_export().await;
300                let _ = self.reader.exporter.shutdown();
301                if let Err(send_error) =
302                    ch.send(res.map_err(|e| OTelSdkError::InternalFailure(e.to_string())))
303                {
304                    otel_debug!(
305                        name: "PeriodicReader.Shutdown.SendResultError",
306                        message = "Failed to send shutdown result",
307                        reason = format!("{:?}", send_error),
308                    );
309                }
310                return false;
311            }
312        }
313
314        true
315    }
316
317    async fn run(mut self, mut messages: impl FusedStream<Item = Message> + Unpin) {
318        while let Some(message) = messages.next().await {
319            if !self.process_message(message).await {
320                break;
321            }
322        }
323    }
324}
325
326impl MetricReader for PeriodicReader {
327    fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
328        let mut inner = match self.inner.lock() {
329            Ok(guard) => guard,
330            Err(_) => return,
331        };
332
333        let worker = match &mut inner.sdk_producer_or_worker {
334            ProducerOrWorker::Producer(_) => {
335                // Only register once. If producer is already set, do nothing.
336                otel_debug!(name: "PeriodicReader.DuplicateRegistration",
337                    message = "duplicate registration found, did not register periodic reader.");
338                return;
339            }
340            ProducerOrWorker::Worker(w) => mem::replace(w, Box::new(|_| {})),
341        };
342
343        inner.sdk_producer_or_worker = ProducerOrWorker::Producer(pipeline);
344        worker(self);
345    }
346
347    fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
348        let inner = self.inner.lock()?;
349        if inner.is_shutdown {
350            return Err(MetricError::Other("reader is shut down".into()));
351        }
352
353        if let Some(producer) = match &inner.sdk_producer_or_worker {
354            ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(),
355            ProducerOrWorker::Worker(_) => None,
356        } {
357            producer.produce(rm)?;
358        } else {
359            return Err(MetricError::Other("reader is not registered".into()));
360        }
361
362        Ok(())
363    }
364
365    fn force_flush(&self) -> OTelSdkResult {
366        let mut inner = self
367            .inner
368            .lock()
369            .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
370        if inner.is_shutdown {
371            return Err(OTelSdkError::AlreadyShutdown);
372        }
373        let (sender, receiver) = oneshot::channel();
374        inner
375            .message_sender
376            .try_send(Message::Flush(sender))
377            .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
378
379        drop(inner); // don't hold lock when blocking on future
380
381        futures_executor::block_on(receiver)
382            .map_err(|err| OTelSdkError::InternalFailure(err.to_string()))
383            .and_then(|res| res)
384    }
385
386    fn shutdown(&self) -> OTelSdkResult {
387        let mut inner = self
388            .inner
389            .lock()
390            .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
391        if inner.is_shutdown {
392            return Err(OTelSdkError::AlreadyShutdown);
393        }
394
395        let (sender, receiver) = oneshot::channel();
396        inner
397            .message_sender
398            .try_send(Message::Shutdown(sender))
399            .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
400        drop(inner); // don't hold lock when blocking on future
401
402        let shutdown_result = futures_executor::block_on(receiver)
403            .map_err(|err| OTelSdkError::InternalFailure(err.to_string()))?;
404
405        // Acquire the lock again to set the shutdown flag
406        let mut inner = self
407            .inner
408            .lock()
409            .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
410        inner.is_shutdown = true;
411
412        shutdown_result
413    }
414
415    /// To construct a [MetricReader][metric-reader] when setting up an SDK,
416    /// The output temporality (optional), a function of instrument kind.
417    /// This function SHOULD be obtained from the exporter.
418    ///
419    /// If not configured, the Cumulative temporality SHOULD be used.
420    ///  
421    /// [metric-reader]: https://github.com/open-telemetry/opentelemetry-specification/blob/0a78571045ca1dca48621c9648ec3c832c3c541c/specification/metrics/sdk.md#metricreader
422    fn temporality(&self, kind: InstrumentKind) -> super::Temporality {
423        kind.temporality_preference(self.exporter.temporality())
424    }
425}
426
427#[cfg(all(test, feature = "testing"))]
428mod tests {
429    use super::PeriodicReader;
430    use crate::metrics::reader::MetricReader;
431    use crate::metrics::MetricError;
432    use crate::{
433        metrics::data::ResourceMetrics, metrics::InMemoryMetricExporter, metrics::SdkMeterProvider,
434        runtime, Resource,
435    };
436    use opentelemetry::metrics::MeterProvider;
437    use std::sync::mpsc;
438
439    #[test]
440    fn collection_triggered_by_interval_tokio_current() {
441        collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
442    }
443
444    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
445    async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio() {
446        collection_triggered_by_interval_helper(runtime::Tokio);
447    }
448
449    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
450    async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio() {
451        collection_triggered_by_interval_helper(runtime::Tokio);
452    }
453
454    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
455    async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio_current()
456    {
457        collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
458    }
459
460    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
461    async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio_current()
462    {
463        collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
464    }
465
466    #[tokio::test(flavor = "current_thread")]
467    #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/2056"]
468    async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio() {
469        collection_triggered_by_interval_helper(runtime::Tokio);
470    }
471
472    #[tokio::test(flavor = "current_thread")]
473    async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio_current() {
474        collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
475    }
476
477    #[test]
478    fn unregistered_collect() {
479        // Arrange
480        let exporter = InMemoryMetricExporter::default();
481        let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
482        let mut rm = ResourceMetrics {
483            resource: Resource::empty(),
484            scope_metrics: Vec::new(),
485        };
486
487        // Act
488        let result = reader.collect(&mut rm);
489
490        // Assert
491        assert!(
492            matches!(result.unwrap_err(), MetricError::Other(err) if err == "reader is not registered")
493        );
494    }
495
496    fn collection_triggered_by_interval_helper<RT>(runtime: RT)
497    where
498        RT: crate::runtime::Runtime,
499    {
500        let interval = std::time::Duration::from_millis(1);
501        let exporter = InMemoryMetricExporter::default();
502        let reader = PeriodicReader::builder(exporter.clone(), runtime)
503            .with_interval(interval)
504            .build();
505        let (sender, receiver) = mpsc::channel();
506
507        // Act
508        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
509        let meter = meter_provider.meter("test");
510        let _counter = meter
511            .u64_observable_counter("testcounter")
512            .with_callback(move |_| {
513                sender.send(()).expect("channel should still be open");
514            })
515            .build();
516
517        // Assert
518        receiver
519            .recv()
520            .expect("message should be available in channel, indicating a collection occurred");
521    }
522}