opentelemetry_sdk/metrics/
periodic_reader_with_async_runtime.rs1use std::{
2 env, fmt, mem,
3 sync::{Arc, Mutex, Weak},
4 time::Duration,
5};
6
7use futures_channel::{mpsc, oneshot};
8use futures_util::{
9 future::{self, Either},
10 pin_mut,
11 stream::{self, FusedStream},
12 StreamExt,
13};
14use opentelemetry::{otel_debug, otel_error};
15
16use crate::runtime::Runtime;
17use crate::{
18 error::{OTelSdkError, OTelSdkResult},
19 metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
20 Resource,
21};
22
23use super::{data::ResourceMetrics, reader::MetricReader, InstrumentKind, Pipeline};
24
25const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
26const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
27
28const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
29const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT";
30
31#[derive(Debug)]
46pub struct PeriodicReaderBuilder<E, RT> {
47 interval: Duration,
48 timeout: Duration,
49 exporter: E,
50 runtime: RT,
51}
52
53impl<E, RT> PeriodicReaderBuilder<E, RT>
54where
55 E: PushMetricExporter,
56 RT: Runtime,
57{
58 fn new(exporter: E, runtime: RT) -> Self {
59 let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
60 .ok()
61 .and_then(|v| v.parse().map(Duration::from_millis).ok())
62 .unwrap_or(DEFAULT_INTERVAL);
63 let timeout = env::var(METRIC_EXPORT_TIMEOUT_NAME)
64 .ok()
65 .and_then(|v| v.parse().map(Duration::from_millis).ok())
66 .unwrap_or(DEFAULT_TIMEOUT);
67
68 PeriodicReaderBuilder {
69 interval,
70 timeout,
71 exporter,
72 runtime,
73 }
74 }
75
76 pub fn with_interval(mut self, interval: Duration) -> Self {
84 if !interval.is_zero() {
85 self.interval = interval;
86 }
87 self
88 }
89
90 pub fn with_timeout(mut self, timeout: Duration) -> Self {
99 if !timeout.is_zero() {
100 self.timeout = timeout;
101 }
102 self
103 }
104
105 pub fn build(self) -> PeriodicReader {
107 let (message_sender, message_receiver) = mpsc::channel(256);
108
109 let worker = move |reader: &PeriodicReader| {
110 let runtime = self.runtime.clone();
111 let reader = reader.clone();
112 self.runtime.spawn(Box::pin(async move {
113 let ticker = runtime
114 .interval(self.interval)
115 .skip(1) .map(|_| Message::Export);
117 let messages = Box::pin(stream::select(message_receiver, ticker));
118 PeriodicReaderWorker {
119 reader,
120 timeout: self.timeout,
121 runtime,
122 rm: ResourceMetrics {
123 resource: Resource::empty(),
124 scope_metrics: Vec::new(),
125 },
126 }
127 .run(messages)
128 .await
129 }));
130 };
131
132 otel_debug!(
133 name: "PeriodicReader.BuildCompleted",
134 message = "Periodic reader built.",
135 interval_in_secs = self.interval.as_secs(),
136 temporality = format!("{:?}", self.exporter.temporality()),
137 );
138
139 PeriodicReader {
140 exporter: Arc::new(self.exporter),
141 inner: Arc::new(Mutex::new(PeriodicReaderInner {
142 message_sender,
143 is_shutdown: false,
144 sdk_producer_or_worker: ProducerOrWorker::Worker(Box::new(worker)),
145 })),
146 }
147 }
148}
149
150#[derive(Clone)]
188pub struct PeriodicReader {
189 exporter: Arc<dyn PushMetricExporter>,
190 inner: Arc<Mutex<PeriodicReaderInner>>,
191}
192
193impl PeriodicReader {
194 pub fn builder<E, RT>(exporter: E, runtime: RT) -> PeriodicReaderBuilder<E, RT>
196 where
197 E: PushMetricExporter,
198 RT: Runtime,
199 {
200 PeriodicReaderBuilder::new(exporter, runtime)
201 }
202}
203
204impl fmt::Debug for PeriodicReader {
205 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206 f.debug_struct("PeriodicReader").finish()
207 }
208}
209
210struct PeriodicReaderInner {
211 message_sender: mpsc::Sender<Message>,
212 is_shutdown: bool,
213 sdk_producer_or_worker: ProducerOrWorker,
214}
215
216#[derive(Debug)]
217enum Message {
218 Export,
219 Flush(oneshot::Sender<OTelSdkResult>),
220 Shutdown(oneshot::Sender<OTelSdkResult>),
221}
222
223enum ProducerOrWorker {
224 Producer(Weak<dyn SdkProducer>),
225 Worker(Box<dyn FnOnce(&PeriodicReader) + Send + Sync>),
226}
227
228struct PeriodicReaderWorker<RT: Runtime> {
229 reader: PeriodicReader,
230 timeout: Duration,
231 runtime: RT,
232 rm: ResourceMetrics,
233}
234
235impl<RT: Runtime> PeriodicReaderWorker<RT> {
236 async fn collect_and_export(&mut self) -> OTelSdkResult {
237 self.reader
238 .collect(&mut self.rm)
239 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
240 if self.rm.scope_metrics.is_empty() {
241 otel_debug!(
242 name: "PeriodicReaderWorker.NoMetricsToExport",
243 );
244 return Ok(());
246 }
247
248 otel_debug!(
249 name: "PeriodicReaderWorker.InvokeExporter",
250 message = "Calling exporter's export method with collected metrics.",
251 count = self.rm.scope_metrics.len(),
252 );
253 let export = self.reader.exporter.export(&mut self.rm);
254 let timeout = self.runtime.delay(self.timeout);
255 pin_mut!(export);
256 pin_mut!(timeout);
257
258 match future::select(export, timeout).await {
259 Either::Left((res, _)) => {
260 res }
262 Either::Right(_) => Err(OTelSdkError::Timeout(self.timeout)),
263 }
264 }
265
266 async fn process_message(&mut self, message: Message) -> bool {
267 match message {
268 Message::Export => {
269 otel_debug!(
270 name: "PeriodicReader.ExportTriggered",
271 message = "Export message received.",
272 );
273 if let Err(err) = self.collect_and_export().await {
274 otel_error!(
275 name: "PeriodicReader.ExportFailed",
276 message = "Failed to export metrics",
277 reason = format!("{}", err));
278 }
279 }
280 Message::Flush(ch) => {
281 otel_debug!(
282 name: "PeriodicReader.ForceFlushCalled",
283 message = "Flush message received.",
284 );
285 let res = self.collect_and_export().await;
286 if let Err(send_error) = ch.send(res) {
287 otel_debug!(
288 name: "PeriodicReader.Flush.SendResultError",
289 message = "Failed to send flush result.",
290 reason = format!("{:?}", send_error),
291 );
292 }
293 }
294 Message::Shutdown(ch) => {
295 otel_debug!(
296 name: "PeriodicReader.ShutdownCalled",
297 message = "Shutdown message received",
298 );
299 let res = self.collect_and_export().await;
300 let _ = self.reader.exporter.shutdown();
301 if let Err(send_error) =
302 ch.send(res.map_err(|e| OTelSdkError::InternalFailure(e.to_string())))
303 {
304 otel_debug!(
305 name: "PeriodicReader.Shutdown.SendResultError",
306 message = "Failed to send shutdown result",
307 reason = format!("{:?}", send_error),
308 );
309 }
310 return false;
311 }
312 }
313
314 true
315 }
316
317 async fn run(mut self, mut messages: impl FusedStream<Item = Message> + Unpin) {
318 while let Some(message) = messages.next().await {
319 if !self.process_message(message).await {
320 break;
321 }
322 }
323 }
324}
325
326impl MetricReader for PeriodicReader {
327 fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
328 let mut inner = match self.inner.lock() {
329 Ok(guard) => guard,
330 Err(_) => return,
331 };
332
333 let worker = match &mut inner.sdk_producer_or_worker {
334 ProducerOrWorker::Producer(_) => {
335 otel_debug!(name: "PeriodicReader.DuplicateRegistration",
337 message = "duplicate registration found, did not register periodic reader.");
338 return;
339 }
340 ProducerOrWorker::Worker(w) => mem::replace(w, Box::new(|_| {})),
341 };
342
343 inner.sdk_producer_or_worker = ProducerOrWorker::Producer(pipeline);
344 worker(self);
345 }
346
347 fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
348 let inner = self.inner.lock()?;
349 if inner.is_shutdown {
350 return Err(MetricError::Other("reader is shut down".into()));
351 }
352
353 if let Some(producer) = match &inner.sdk_producer_or_worker {
354 ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(),
355 ProducerOrWorker::Worker(_) => None,
356 } {
357 producer.produce(rm)?;
358 } else {
359 return Err(MetricError::Other("reader is not registered".into()));
360 }
361
362 Ok(())
363 }
364
365 fn force_flush(&self) -> OTelSdkResult {
366 let mut inner = self
367 .inner
368 .lock()
369 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
370 if inner.is_shutdown {
371 return Err(OTelSdkError::AlreadyShutdown);
372 }
373 let (sender, receiver) = oneshot::channel();
374 inner
375 .message_sender
376 .try_send(Message::Flush(sender))
377 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
378
379 drop(inner); futures_executor::block_on(receiver)
382 .map_err(|err| OTelSdkError::InternalFailure(err.to_string()))
383 .and_then(|res| res)
384 }
385
386 fn shutdown(&self) -> OTelSdkResult {
387 let mut inner = self
388 .inner
389 .lock()
390 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
391 if inner.is_shutdown {
392 return Err(OTelSdkError::AlreadyShutdown);
393 }
394
395 let (sender, receiver) = oneshot::channel();
396 inner
397 .message_sender
398 .try_send(Message::Shutdown(sender))
399 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
400 drop(inner); let shutdown_result = futures_executor::block_on(receiver)
403 .map_err(|err| OTelSdkError::InternalFailure(err.to_string()))?;
404
405 let mut inner = self
407 .inner
408 .lock()
409 .map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
410 inner.is_shutdown = true;
411
412 shutdown_result
413 }
414
415 fn temporality(&self, kind: InstrumentKind) -> super::Temporality {
423 kind.temporality_preference(self.exporter.temporality())
424 }
425}
426
427#[cfg(all(test, feature = "testing"))]
428mod tests {
429 use super::PeriodicReader;
430 use crate::metrics::reader::MetricReader;
431 use crate::metrics::MetricError;
432 use crate::{
433 metrics::data::ResourceMetrics, metrics::InMemoryMetricExporter, metrics::SdkMeterProvider,
434 runtime, Resource,
435 };
436 use opentelemetry::metrics::MeterProvider;
437 use std::sync::mpsc;
438
439 #[test]
440 fn collection_triggered_by_interval_tokio_current() {
441 collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
442 }
443
444 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
445 async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio() {
446 collection_triggered_by_interval_helper(runtime::Tokio);
447 }
448
449 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
450 async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio() {
451 collection_triggered_by_interval_helper(runtime::Tokio);
452 }
453
454 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
455 async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio_current()
456 {
457 collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
458 }
459
460 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
461 async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio_current()
462 {
463 collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
464 }
465
466 #[tokio::test(flavor = "current_thread")]
467 #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/2056"]
468 async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio() {
469 collection_triggered_by_interval_helper(runtime::Tokio);
470 }
471
472 #[tokio::test(flavor = "current_thread")]
473 async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio_current() {
474 collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
475 }
476
477 #[test]
478 fn unregistered_collect() {
479 let exporter = InMemoryMetricExporter::default();
481 let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
482 let mut rm = ResourceMetrics {
483 resource: Resource::empty(),
484 scope_metrics: Vec::new(),
485 };
486
487 let result = reader.collect(&mut rm);
489
490 assert!(
492 matches!(result.unwrap_err(), MetricError::Other(err) if err == "reader is not registered")
493 );
494 }
495
496 fn collection_triggered_by_interval_helper<RT>(runtime: RT)
497 where
498 RT: crate::runtime::Runtime,
499 {
500 let interval = std::time::Duration::from_millis(1);
501 let exporter = InMemoryMetricExporter::default();
502 let reader = PeriodicReader::builder(exporter.clone(), runtime)
503 .with_interval(interval)
504 .build();
505 let (sender, receiver) = mpsc::channel();
506
507 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
509 let meter = meter_provider.meter("test");
510 let _counter = meter
511 .u64_observable_counter("testcounter")
512 .with_callback(move |_| {
513 sender.send(()).expect("channel should still be open");
514 })
515 .build();
516
517 receiver
519 .recv()
520 .expect("message should be available in channel, indicating a collection occurred");
521 }
522}