opentelemetry_sdk/trace/
span_processor_with_async_runtime.rs

1use crate::error::{OTelSdkError, OTelSdkResult};
2use crate::resource::Resource;
3use crate::runtime::{RuntimeChannel, TrySend};
4use crate::trace::BatchConfig;
5use crate::trace::Span;
6use crate::trace::SpanProcessor;
7use crate::trace::{SpanData, SpanExporter};
8use futures_channel::oneshot;
9use futures_util::{
10    future::{self, BoxFuture, Either},
11    select,
12    stream::{self, FusedStream, FuturesUnordered},
13    StreamExt as _,
14};
15use opentelemetry::Context;
16use opentelemetry::{otel_debug, otel_error, otel_warn};
17use std::fmt;
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::sync::Arc;
20
21/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
22/// them at a preconfigured interval.
23///
24/// Batch span processors need to run a background task to collect and send
25/// spans. Different runtimes need different ways to handle the background task.
26///
27/// Note: Configuring an opentelemetry `Runtime` that's not compatible with the
28/// underlying runtime can cause deadlocks (see tokio section).
29///
30/// ### Use with Tokio
31///
32/// Tokio currently offers two different schedulers. One is
33/// `current_thread_scheduler`, the other is `multiple_thread_scheduler`. Both
34/// of them default to use batch span processors to install span exporters.
35///
36/// Tokio's `current_thread_scheduler` can cause the program to hang forever if
37/// blocking work is scheduled with other tasks in the same runtime. To avoid
38/// this, be sure to enable the `rt-tokio-current-thread` feature in this crate
39/// if you are using that runtime (e.g. users of actix-web), and blocking tasks
40/// will then be scheduled on a different thread.
41///
42/// # Examples
43///
44/// This processor can be configured with an [`executor`] of your choice to
45/// batch and upload spans asynchronously when they end. If you have added a
46/// library like [`tokio`] or [`async-std`], you can pass in their respective
47/// `spawn` and `interval` functions to have batching performed in those
48/// contexts.
49///
50/// ```
51/// # #[cfg(feature="tokio")]
52/// # {
53/// use opentelemetry::global;
54/// use opentelemetry_sdk::{runtime, testing::trace::NoopSpanExporter, trace};
55/// use opentelemetry_sdk::trace::BatchConfigBuilder;
56/// use std::time::Duration;
57/// use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
58///
59/// #[tokio::main]
60/// async fn main() {
61///     // Configure your preferred exporter
62///     let exporter = NoopSpanExporter::new();
63///
64///     // Create a batch span processor using an exporter and a runtime
65///     let batch = BatchSpanProcessor::builder(exporter, runtime::Tokio)
66///         .with_batch_config(BatchConfigBuilder::default().with_max_queue_size(4096).build())
67///         .build();
68///
69///     // Then use the `with_batch_exporter` method to have the provider export spans in batches.
70///     let provider = trace::TracerProvider::builder()
71///         .with_span_processor(batch)
72///         .build();
73///
74///     let _ = global::set_tracer_provider(provider);
75/// }
76/// # }
77/// ```
78///
79/// [`executor`]: https://docs.rs/futures/0.3/futures/executor/index.html
80/// [`tokio`]: https://tokio.rs
81/// [`async-std`]: https://async.rs
82pub struct BatchSpanProcessor<R: RuntimeChannel> {
83    message_sender: R::Sender<BatchMessage>,
84
85    // Track dropped spans
86    dropped_spans_count: AtomicUsize,
87
88    // Track the maximum queue size that was configured for this processor
89    max_queue_size: usize,
90}
91
92impl<R: RuntimeChannel> fmt::Debug for BatchSpanProcessor<R> {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        f.debug_struct("BatchSpanProcessor")
95            .field("message_sender", &self.message_sender)
96            .finish()
97    }
98}
99
100impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
101    fn on_start(&self, _span: &mut Span, _cx: &Context) {
102        // Ignored
103    }
104
105    fn on_end(&self, span: SpanData) {
106        if !span.span_context.is_sampled() {
107            return;
108        }
109
110        let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
111
112        // If the queue is full, and we can't buffer a span
113        if result.is_err() {
114            // Increment the number of dropped spans. If this is the first time we've had to drop,
115            // emit a warning.
116            if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 {
117                otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
118                    message = "Beginning to drop span messages due to full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped.");
119            }
120        }
121    }
122
123    fn force_flush(&self) -> OTelSdkResult {
124        let (res_sender, res_receiver) = oneshot::channel();
125        self.message_sender
126            .try_send(BatchMessage::Flush(Some(res_sender)))
127            .map_err(|err| {
128                OTelSdkError::InternalFailure(format!("Failed to send flush message: {}", err))
129            })?;
130
131        futures_executor::block_on(res_receiver).map_err(|err| {
132            OTelSdkError::InternalFailure(format!("Flush response channel error: {}", err))
133        })?
134    }
135
136    fn shutdown(&self) -> OTelSdkResult {
137        let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
138        let max_queue_size = self.max_queue_size;
139        if dropped_spans > 0 {
140            otel_warn!(
141                name: "BatchSpanProcessor.Shutdown",
142                dropped_spans = dropped_spans,
143                max_queue_size = max_queue_size,
144                message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
145            );
146        }
147
148        let (res_sender, res_receiver) = oneshot::channel();
149        self.message_sender
150            .try_send(BatchMessage::Shutdown(res_sender))
151            .map_err(|err| {
152                OTelSdkError::InternalFailure(format!("Failed to send shutdown message: {}", err))
153            })?;
154
155        futures_executor::block_on(res_receiver).map_err(|err| {
156            OTelSdkError::InternalFailure(format!("Shutdown response channel error: {}", err))
157        })?
158    }
159
160    fn set_resource(&mut self, resource: &Resource) {
161        let resource = Arc::new(resource.clone());
162        let _ = self
163            .message_sender
164            .try_send(BatchMessage::SetResource(resource));
165    }
166}
167
168/// Messages sent between application thread and batch span processor's work thread.
169// In this enum the size difference is not a concern because:
170// 1. If we wrap SpanData into a pointer, it will add overhead when processing.
171// 2. Most of the messages will be ExportSpan.
172#[allow(clippy::large_enum_variant)]
173#[derive(Debug)]
174enum BatchMessage {
175    /// Export spans, usually called when span ends
176    ExportSpan(SpanData),
177    /// Flush the current buffer to the backend, it can be triggered by
178    /// pre configured interval or a call to `force_push` function.
179    Flush(Option<oneshot::Sender<OTelSdkResult>>),
180    /// Shut down the worker thread, push all spans in buffer to the backend.
181    Shutdown(oneshot::Sender<OTelSdkResult>),
182    /// Set the resource for the exporter.
183    SetResource(Arc<Resource>),
184}
185
186struct BatchSpanProcessorInternal<R> {
187    spans: Vec<SpanData>,
188    export_tasks: FuturesUnordered<BoxFuture<'static, OTelSdkResult>>,
189    runtime: R,
190    exporter: Box<dyn SpanExporter>,
191    config: BatchConfig,
192}
193
194impl<R: RuntimeChannel> BatchSpanProcessorInternal<R> {
195    async fn flush(&mut self, res_channel: Option<oneshot::Sender<OTelSdkResult>>) {
196        let export_task = self.export();
197        let task = Box::pin(async move {
198            let result = export_task.await;
199
200            if let Some(channel) = res_channel {
201                // If a response channel is provided, attempt to send the export result through it.
202                if let Err(result) = channel.send(result) {
203                    otel_debug!(
204                        name: "BatchSpanProcessor.Flush.SendResultError",
205                        reason = format!("{:?}", result)
206                    );
207                }
208            } else if let Err(err) = result {
209                // If no channel is provided and the export operation encountered an error,
210                // log the error directly here.
211                // TODO: Consider returning the status instead of logging it.
212                otel_error!(
213                    name: "BatchSpanProcessor.Flush.ExportError",
214                    reason = format!("{:?}", err),
215                    message = "Failed during the export process"
216                );
217            }
218
219            Ok(())
220        });
221
222        if self.config.max_concurrent_exports == 1 {
223            let _ = task.await;
224        } else {
225            self.export_tasks.push(task);
226            while self.export_tasks.next().await.is_some() {}
227        }
228    }
229
230    /// Process a single message
231    ///
232    /// A return value of false indicates shutdown
233    async fn process_message(&mut self, message: BatchMessage) -> bool {
234        match message {
235            // Span has finished, add to buffer of pending spans.
236            BatchMessage::ExportSpan(span) => {
237                self.spans.push(span);
238
239                if self.spans.len() == self.config.max_export_batch_size {
240                    // If concurrent exports are saturated, wait for one to complete.
241                    if !self.export_tasks.is_empty()
242                        && self.export_tasks.len() == self.config.max_concurrent_exports
243                    {
244                        self.export_tasks.next().await;
245                    }
246
247                    let export_task = self.export();
248                    let task = async move {
249                        if let Err(err) = export_task.await {
250                            otel_error!(
251                                name: "BatchSpanProcessor.Export.Error",
252                                reason = format!("{}", err)
253                            );
254                        }
255
256                        Ok(())
257                    };
258                    // Special case when not using concurrent exports
259                    if self.config.max_concurrent_exports == 1 {
260                        let _ = task.await;
261                    } else {
262                        self.export_tasks.push(Box::pin(task));
263                    }
264                }
265            }
266            // Span batch interval time reached or a force flush has been invoked, export
267            // current spans.
268            //
269            // This is a hint to ensure that any tasks associated with Spans for which the
270            // SpanProcessor had already received events prior to the call to ForceFlush
271            // SHOULD be completed as soon as possible, preferably before returning from
272            // this method.
273            //
274            // In particular, if any SpanProcessor has any associated exporter, it SHOULD
275            // try to call the exporter's Export with all spans for which this was not
276            // already done and then invoke ForceFlush on it. The built-in SpanProcessors
277            // MUST do so. If a timeout is specified (see below), the SpanProcessor MUST
278            // prioritize honoring the timeout over finishing all calls. It MAY skip or
279            // abort some or all Export or ForceFlush calls it has made to achieve this
280            // goal.
281            //
282            // NB: `force_flush` is not currently implemented on exporters; the equivalent
283            // would be waiting for exporter tasks to complete. In the case of
284            // channel-coupled exporters, they will need a `force_flush` implementation to
285            // properly block.
286            BatchMessage::Flush(res_channel) => {
287                self.flush(res_channel).await;
288            }
289            // Stream has terminated or processor is shutdown, return to finish execution.
290            BatchMessage::Shutdown(ch) => {
291                self.flush(Some(ch)).await;
292                let _ = self.exporter.shutdown();
293                return false;
294            }
295            // propagate the resource
296            BatchMessage::SetResource(resource) => {
297                self.exporter.set_resource(&resource);
298            }
299        }
300        true
301    }
302
303    fn export(&mut self) -> BoxFuture<'static, OTelSdkResult> {
304        // Batch size check for flush / shutdown. Those methods may be called
305        // when there's no work to do.
306        if self.spans.is_empty() {
307            return Box::pin(future::ready(Ok(())));
308        }
309
310        let export = self.exporter.export(self.spans.split_off(0));
311        let timeout = self.runtime.delay(self.config.max_export_timeout);
312        let time_out = self.config.max_export_timeout;
313
314        Box::pin(async move {
315            match future::select(export, timeout).await {
316                Either::Left((export_res, _)) => export_res,
317                Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)),
318            }
319        })
320    }
321
322    async fn run(mut self, mut messages: impl FusedStream<Item = BatchMessage> + Unpin) {
323        loop {
324            select! {
325                // FuturesUnordered implements Fuse intelligently such that it
326                // will become eligible again once new tasks are added to it.
327                _ = self.export_tasks.next() => {
328                    // An export task completed; do we need to do anything with it?
329                },
330                message = messages.next() => {
331                    match message {
332                        Some(message) => {
333                            if !self.process_message(message).await {
334                                break;
335                            }
336                        },
337                        None => break,
338                    }
339                },
340            }
341        }
342    }
343}
344
345impl<R: RuntimeChannel> BatchSpanProcessor<R> {
346    pub(crate) fn new(exporter: Box<dyn SpanExporter>, config: BatchConfig, runtime: R) -> Self {
347        let (message_sender, message_receiver) =
348            runtime.batch_message_channel(config.max_queue_size);
349
350        let max_queue_size = config.max_queue_size;
351
352        let inner_runtime = runtime.clone();
353        // Spawn worker process via user-defined spawn function.
354        runtime.spawn(Box::pin(async move {
355            // Timer will take a reference to the current runtime, so its important we do this within the
356            // runtime.spawn()
357            let ticker = inner_runtime
358                .interval(config.scheduled_delay)
359                .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
360                .map(|_| BatchMessage::Flush(None));
361            let timeout_runtime = inner_runtime.clone();
362
363            let messages = Box::pin(stream::select(message_receiver, ticker));
364            let processor = BatchSpanProcessorInternal {
365                spans: Vec::new(),
366                export_tasks: FuturesUnordered::new(),
367                runtime: timeout_runtime,
368                config,
369                exporter,
370            };
371
372            processor.run(messages).await
373        }));
374
375        // Return batch processor with link to worker
376        BatchSpanProcessor {
377            message_sender,
378            dropped_spans_count: AtomicUsize::new(0),
379            max_queue_size,
380        }
381    }
382
383    /// Create a new batch processor builder
384    pub fn builder<E>(exporter: E, runtime: R) -> BatchSpanProcessorBuilder<E, R>
385    where
386        E: SpanExporter,
387    {
388        BatchSpanProcessorBuilder {
389            exporter,
390            config: Default::default(),
391            runtime,
392        }
393    }
394}
395
396/// A builder for creating [`BatchSpanProcessor`] instances.
397///
398#[derive(Debug)]
399pub struct BatchSpanProcessorBuilder<E, R> {
400    exporter: E,
401    config: BatchConfig,
402    runtime: R,
403}
404
405impl<E, R> BatchSpanProcessorBuilder<E, R>
406where
407    E: SpanExporter + 'static,
408    R: RuntimeChannel,
409{
410    /// Set the BatchConfig for [BatchSpanProcessorBuilder]
411    pub fn with_batch_config(self, config: BatchConfig) -> Self {
412        BatchSpanProcessorBuilder { config, ..self }
413    }
414
415    /// Build a batch processor
416    pub fn build(self) -> BatchSpanProcessor<R> {
417        BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime)
418    }
419}
420
421#[cfg(all(test, feature = "testing", feature = "trace"))]
422mod tests {
423    // cargo test trace::span_processor::tests:: --features=testing
424    use super::{BatchSpanProcessor, SpanProcessor};
425    use crate::error::OTelSdkResult;
426    use crate::runtime;
427    use crate::testing::trace::{new_test_export_span_data, new_tokio_test_exporter};
428    use crate::trace::span_processor::{
429        OTEL_BSP_EXPORT_TIMEOUT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE,
430        OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
431    };
432    use crate::trace::{BatchConfig, BatchConfigBuilder, InMemorySpanExporterBuilder};
433    use crate::trace::{SpanData, SpanExporter};
434    use futures_util::Future;
435    use std::fmt::Debug;
436    use std::time::Duration;
437
438    struct BlockingExporter<D> {
439        delay_for: Duration,
440        delay_fn: D,
441    }
442
443    impl<D, DS> Debug for BlockingExporter<D>
444    where
445        D: Fn(Duration) -> DS + 'static + Send + Sync,
446        DS: Future<Output = ()> + Send + Sync + 'static,
447    {
448        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
449            f.write_str("blocking exporter for testing")
450        }
451    }
452
453    impl<D, DS> SpanExporter for BlockingExporter<D>
454    where
455        D: Fn(Duration) -> DS + 'static + Send + Sync,
456        DS: Future<Output = ()> + Send + Sync + 'static,
457    {
458        fn export(
459            &mut self,
460            _batch: Vec<SpanData>,
461        ) -> futures_util::future::BoxFuture<'static, OTelSdkResult> {
462            use futures_util::FutureExt;
463            Box::pin((self.delay_fn)(self.delay_for).map(|_| Ok(())))
464        }
465    }
466
467    #[test]
468    fn test_build_batch_span_processor_builder() {
469        let mut env_vars = vec![
470            (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("500")),
471            (OTEL_BSP_SCHEDULE_DELAY, Some("I am not number")),
472            (OTEL_BSP_EXPORT_TIMEOUT, Some("2046")),
473        ];
474        temp_env::with_vars(env_vars.clone(), || {
475            let builder = BatchSpanProcessor::builder(
476                InMemorySpanExporterBuilder::new().build(),
477                runtime::Tokio,
478            );
479            // export batch size cannot exceed max queue size
480            assert_eq!(builder.config.max_export_batch_size, 500);
481            assert_eq!(
482                builder.config.scheduled_delay,
483                Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
484            );
485            assert_eq!(
486                builder.config.max_queue_size,
487                OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT
488            );
489            assert_eq!(
490                builder.config.max_export_timeout,
491                Duration::from_millis(2046)
492            );
493        });
494
495        env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120")));
496
497        temp_env::with_vars(env_vars, || {
498            let builder = BatchSpanProcessor::builder(
499                InMemorySpanExporterBuilder::new().build(),
500                runtime::Tokio,
501            );
502            assert_eq!(builder.config.max_export_batch_size, 120);
503            assert_eq!(builder.config.max_queue_size, 120);
504        });
505    }
506
507    #[tokio::test]
508    async fn test_batch_span_processor() {
509        let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter();
510        let config = BatchConfigBuilder::default()
511            .with_scheduled_delay(Duration::from_secs(60 * 60 * 24)) // set the tick to 24 hours so we know the span must be exported via force_flush
512            .build();
513        let processor =
514            BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
515        let handle = tokio::spawn(async move {
516            loop {
517                if let Some(span) = export_receiver.recv().await {
518                    assert_eq!(span.span_context, new_test_export_span_data().span_context);
519                    break;
520                }
521            }
522        });
523        tokio::time::sleep(Duration::from_secs(1)).await; // skip the first
524        processor.on_end(new_test_export_span_data());
525        let flush_res = processor.force_flush();
526        assert!(flush_res.is_ok());
527        let _shutdown_result = processor.shutdown();
528
529        assert!(
530            tokio::time::timeout(Duration::from_secs(5), handle)
531                .await
532                .is_ok(),
533            "timed out in 5 seconds. force_flush may not export any data when called"
534        );
535    }
536
537    // If the time_out is true, then the result suppose to ended with timeout.
538    // otherwise the exporter should be able to export within time out duration.
539    async fn timeout_test_tokio(time_out: bool) {
540        let config = BatchConfig {
541            max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
542            scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush,
543            ..Default::default()
544        };
545        let exporter = BlockingExporter {
546            delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
547            delay_fn: tokio::time::sleep,
548        };
549        let processor =
550            BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
551        tokio::time::sleep(Duration::from_secs(1)).await; // skip the first
552        processor.on_end(new_test_export_span_data());
553        let flush_res = processor.force_flush();
554        if time_out {
555            assert!(flush_res.is_err());
556        } else {
557            assert!(flush_res.is_ok());
558        }
559        let shutdown_res = processor.shutdown();
560        assert!(shutdown_res.is_ok());
561    }
562
563    #[test]
564    fn test_timeout_tokio_timeout() {
565        // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s.
566        // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s.
567        // Either way, the test should be finished within 5s.
568        let runtime = tokio::runtime::Builder::new_multi_thread()
569            .enable_all()
570            .build()
571            .unwrap();
572        runtime.block_on(timeout_test_tokio(true));
573    }
574
575    #[test]
576    fn test_timeout_tokio_not_timeout() {
577        let runtime = tokio::runtime::Builder::new_multi_thread()
578            .enable_all()
579            .build()
580            .unwrap();
581        runtime.block_on(timeout_test_tokio(false));
582    }
583
584    #[test]
585    #[cfg(feature = "rt-async-std")]
586    fn test_timeout_async_std_timeout() {
587        async_std::task::block_on(timeout_test_std_async(true));
588    }
589
590    #[test]
591    #[cfg(feature = "rt-async-std")]
592    fn test_timeout_async_std_not_timeout() {
593        async_std::task::block_on(timeout_test_std_async(false));
594    }
595
596    // If the time_out is true, then the result suppose to ended with timeout.
597    // otherwise the exporter should be able to export within time out duration.
598    #[cfg(feature = "rt-async-std")]
599    async fn timeout_test_std_async(time_out: bool) {
600        let config = BatchConfig {
601            max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
602            scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush
603            ..Default::default()
604        };
605        let exporter = BlockingExporter {
606            delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
607            delay_fn: async_std::task::sleep,
608        };
609        let processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd);
610        processor.on_end(new_test_export_span_data());
611        let flush_res = processor.force_flush();
612        if time_out {
613            assert!(flush_res.is_err());
614        } else {
615            assert!(flush_res.is_ok());
616        }
617        let shutdown_res = processor.shutdown();
618        assert!(shutdown_res.is_ok());
619    }
620}