1use crate::error::{OTelSdkError, OTelSdkResult};
2use crate::resource::Resource;
3use crate::runtime::{RuntimeChannel, TrySend};
4use crate::trace::BatchConfig;
5use crate::trace::Span;
6use crate::trace::SpanProcessor;
7use crate::trace::{SpanData, SpanExporter};
8use futures_channel::oneshot;
9use futures_util::{
10 future::{self, BoxFuture, Either},
11 select,
12 stream::{self, FusedStream, FuturesUnordered},
13 StreamExt as _,
14};
15use opentelemetry::Context;
16use opentelemetry::{otel_debug, otel_error, otel_warn};
17use std::fmt;
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::sync::Arc;
20
21pub struct BatchSpanProcessor<R: RuntimeChannel> {
83 message_sender: R::Sender<BatchMessage>,
84
85 dropped_spans_count: AtomicUsize,
87
88 max_queue_size: usize,
90}
91
92impl<R: RuntimeChannel> fmt::Debug for BatchSpanProcessor<R> {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 f.debug_struct("BatchSpanProcessor")
95 .field("message_sender", &self.message_sender)
96 .finish()
97 }
98}
99
100impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
101 fn on_start(&self, _span: &mut Span, _cx: &Context) {
102 }
104
105 fn on_end(&self, span: SpanData) {
106 if !span.span_context.is_sampled() {
107 return;
108 }
109
110 let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
111
112 if result.is_err() {
114 if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 {
117 otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
118 message = "Beginning to drop span messages due to full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped.");
119 }
120 }
121 }
122
123 fn force_flush(&self) -> OTelSdkResult {
124 let (res_sender, res_receiver) = oneshot::channel();
125 self.message_sender
126 .try_send(BatchMessage::Flush(Some(res_sender)))
127 .map_err(|err| {
128 OTelSdkError::InternalFailure(format!("Failed to send flush message: {}", err))
129 })?;
130
131 futures_executor::block_on(res_receiver).map_err(|err| {
132 OTelSdkError::InternalFailure(format!("Flush response channel error: {}", err))
133 })?
134 }
135
136 fn shutdown(&self) -> OTelSdkResult {
137 let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
138 let max_queue_size = self.max_queue_size;
139 if dropped_spans > 0 {
140 otel_warn!(
141 name: "BatchSpanProcessor.Shutdown",
142 dropped_spans = dropped_spans,
143 max_queue_size = max_queue_size,
144 message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
145 );
146 }
147
148 let (res_sender, res_receiver) = oneshot::channel();
149 self.message_sender
150 .try_send(BatchMessage::Shutdown(res_sender))
151 .map_err(|err| {
152 OTelSdkError::InternalFailure(format!("Failed to send shutdown message: {}", err))
153 })?;
154
155 futures_executor::block_on(res_receiver).map_err(|err| {
156 OTelSdkError::InternalFailure(format!("Shutdown response channel error: {}", err))
157 })?
158 }
159
160 fn set_resource(&mut self, resource: &Resource) {
161 let resource = Arc::new(resource.clone());
162 let _ = self
163 .message_sender
164 .try_send(BatchMessage::SetResource(resource));
165 }
166}
167
168#[allow(clippy::large_enum_variant)]
173#[derive(Debug)]
174enum BatchMessage {
175 ExportSpan(SpanData),
177 Flush(Option<oneshot::Sender<OTelSdkResult>>),
180 Shutdown(oneshot::Sender<OTelSdkResult>),
182 SetResource(Arc<Resource>),
184}
185
186struct BatchSpanProcessorInternal<R> {
187 spans: Vec<SpanData>,
188 export_tasks: FuturesUnordered<BoxFuture<'static, OTelSdkResult>>,
189 runtime: R,
190 exporter: Box<dyn SpanExporter>,
191 config: BatchConfig,
192}
193
194impl<R: RuntimeChannel> BatchSpanProcessorInternal<R> {
195 async fn flush(&mut self, res_channel: Option<oneshot::Sender<OTelSdkResult>>) {
196 let export_task = self.export();
197 let task = Box::pin(async move {
198 let result = export_task.await;
199
200 if let Some(channel) = res_channel {
201 if let Err(result) = channel.send(result) {
203 otel_debug!(
204 name: "BatchSpanProcessor.Flush.SendResultError",
205 reason = format!("{:?}", result)
206 );
207 }
208 } else if let Err(err) = result {
209 otel_error!(
213 name: "BatchSpanProcessor.Flush.ExportError",
214 reason = format!("{:?}", err),
215 message = "Failed during the export process"
216 );
217 }
218
219 Ok(())
220 });
221
222 if self.config.max_concurrent_exports == 1 {
223 let _ = task.await;
224 } else {
225 self.export_tasks.push(task);
226 while self.export_tasks.next().await.is_some() {}
227 }
228 }
229
230 async fn process_message(&mut self, message: BatchMessage) -> bool {
234 match message {
235 BatchMessage::ExportSpan(span) => {
237 self.spans.push(span);
238
239 if self.spans.len() == self.config.max_export_batch_size {
240 if !self.export_tasks.is_empty()
242 && self.export_tasks.len() == self.config.max_concurrent_exports
243 {
244 self.export_tasks.next().await;
245 }
246
247 let export_task = self.export();
248 let task = async move {
249 if let Err(err) = export_task.await {
250 otel_error!(
251 name: "BatchSpanProcessor.Export.Error",
252 reason = format!("{}", err)
253 );
254 }
255
256 Ok(())
257 };
258 if self.config.max_concurrent_exports == 1 {
260 let _ = task.await;
261 } else {
262 self.export_tasks.push(Box::pin(task));
263 }
264 }
265 }
266 BatchMessage::Flush(res_channel) => {
287 self.flush(res_channel).await;
288 }
289 BatchMessage::Shutdown(ch) => {
291 self.flush(Some(ch)).await;
292 let _ = self.exporter.shutdown();
293 return false;
294 }
295 BatchMessage::SetResource(resource) => {
297 self.exporter.set_resource(&resource);
298 }
299 }
300 true
301 }
302
303 fn export(&mut self) -> BoxFuture<'static, OTelSdkResult> {
304 if self.spans.is_empty() {
307 return Box::pin(future::ready(Ok(())));
308 }
309
310 let export = self.exporter.export(self.spans.split_off(0));
311 let timeout = self.runtime.delay(self.config.max_export_timeout);
312 let time_out = self.config.max_export_timeout;
313
314 Box::pin(async move {
315 match future::select(export, timeout).await {
316 Either::Left((export_res, _)) => export_res,
317 Either::Right((_, _)) => Err(OTelSdkError::Timeout(time_out)),
318 }
319 })
320 }
321
322 async fn run(mut self, mut messages: impl FusedStream<Item = BatchMessage> + Unpin) {
323 loop {
324 select! {
325 _ = self.export_tasks.next() => {
328 },
330 message = messages.next() => {
331 match message {
332 Some(message) => {
333 if !self.process_message(message).await {
334 break;
335 }
336 },
337 None => break,
338 }
339 },
340 }
341 }
342 }
343}
344
345impl<R: RuntimeChannel> BatchSpanProcessor<R> {
346 pub(crate) fn new(exporter: Box<dyn SpanExporter>, config: BatchConfig, runtime: R) -> Self {
347 let (message_sender, message_receiver) =
348 runtime.batch_message_channel(config.max_queue_size);
349
350 let max_queue_size = config.max_queue_size;
351
352 let inner_runtime = runtime.clone();
353 runtime.spawn(Box::pin(async move {
355 let ticker = inner_runtime
358 .interval(config.scheduled_delay)
359 .skip(1) .map(|_| BatchMessage::Flush(None));
361 let timeout_runtime = inner_runtime.clone();
362
363 let messages = Box::pin(stream::select(message_receiver, ticker));
364 let processor = BatchSpanProcessorInternal {
365 spans: Vec::new(),
366 export_tasks: FuturesUnordered::new(),
367 runtime: timeout_runtime,
368 config,
369 exporter,
370 };
371
372 processor.run(messages).await
373 }));
374
375 BatchSpanProcessor {
377 message_sender,
378 dropped_spans_count: AtomicUsize::new(0),
379 max_queue_size,
380 }
381 }
382
383 pub fn builder<E>(exporter: E, runtime: R) -> BatchSpanProcessorBuilder<E, R>
385 where
386 E: SpanExporter,
387 {
388 BatchSpanProcessorBuilder {
389 exporter,
390 config: Default::default(),
391 runtime,
392 }
393 }
394}
395
396#[derive(Debug)]
399pub struct BatchSpanProcessorBuilder<E, R> {
400 exporter: E,
401 config: BatchConfig,
402 runtime: R,
403}
404
405impl<E, R> BatchSpanProcessorBuilder<E, R>
406where
407 E: SpanExporter + 'static,
408 R: RuntimeChannel,
409{
410 pub fn with_batch_config(self, config: BatchConfig) -> Self {
412 BatchSpanProcessorBuilder { config, ..self }
413 }
414
415 pub fn build(self) -> BatchSpanProcessor<R> {
417 BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime)
418 }
419}
420
421#[cfg(all(test, feature = "testing", feature = "trace"))]
422mod tests {
423 use super::{BatchSpanProcessor, SpanProcessor};
425 use crate::error::OTelSdkResult;
426 use crate::runtime;
427 use crate::testing::trace::{new_test_export_span_data, new_tokio_test_exporter};
428 use crate::trace::span_processor::{
429 OTEL_BSP_EXPORT_TIMEOUT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE,
430 OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT,
431 };
432 use crate::trace::{BatchConfig, BatchConfigBuilder, InMemorySpanExporterBuilder};
433 use crate::trace::{SpanData, SpanExporter};
434 use futures_util::Future;
435 use std::fmt::Debug;
436 use std::time::Duration;
437
438 struct BlockingExporter<D> {
439 delay_for: Duration,
440 delay_fn: D,
441 }
442
443 impl<D, DS> Debug for BlockingExporter<D>
444 where
445 D: Fn(Duration) -> DS + 'static + Send + Sync,
446 DS: Future<Output = ()> + Send + Sync + 'static,
447 {
448 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
449 f.write_str("blocking exporter for testing")
450 }
451 }
452
453 impl<D, DS> SpanExporter for BlockingExporter<D>
454 where
455 D: Fn(Duration) -> DS + 'static + Send + Sync,
456 DS: Future<Output = ()> + Send + Sync + 'static,
457 {
458 fn export(
459 &mut self,
460 _batch: Vec<SpanData>,
461 ) -> futures_util::future::BoxFuture<'static, OTelSdkResult> {
462 use futures_util::FutureExt;
463 Box::pin((self.delay_fn)(self.delay_for).map(|_| Ok(())))
464 }
465 }
466
467 #[test]
468 fn test_build_batch_span_processor_builder() {
469 let mut env_vars = vec![
470 (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("500")),
471 (OTEL_BSP_SCHEDULE_DELAY, Some("I am not number")),
472 (OTEL_BSP_EXPORT_TIMEOUT, Some("2046")),
473 ];
474 temp_env::with_vars(env_vars.clone(), || {
475 let builder = BatchSpanProcessor::builder(
476 InMemorySpanExporterBuilder::new().build(),
477 runtime::Tokio,
478 );
479 assert_eq!(builder.config.max_export_batch_size, 500);
481 assert_eq!(
482 builder.config.scheduled_delay,
483 Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT)
484 );
485 assert_eq!(
486 builder.config.max_queue_size,
487 OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT
488 );
489 assert_eq!(
490 builder.config.max_export_timeout,
491 Duration::from_millis(2046)
492 );
493 });
494
495 env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120")));
496
497 temp_env::with_vars(env_vars, || {
498 let builder = BatchSpanProcessor::builder(
499 InMemorySpanExporterBuilder::new().build(),
500 runtime::Tokio,
501 );
502 assert_eq!(builder.config.max_export_batch_size, 120);
503 assert_eq!(builder.config.max_queue_size, 120);
504 });
505 }
506
507 #[tokio::test]
508 async fn test_batch_span_processor() {
509 let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter();
510 let config = BatchConfigBuilder::default()
511 .with_scheduled_delay(Duration::from_secs(60 * 60 * 24)) .build();
513 let processor =
514 BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
515 let handle = tokio::spawn(async move {
516 loop {
517 if let Some(span) = export_receiver.recv().await {
518 assert_eq!(span.span_context, new_test_export_span_data().span_context);
519 break;
520 }
521 }
522 });
523 tokio::time::sleep(Duration::from_secs(1)).await; processor.on_end(new_test_export_span_data());
525 let flush_res = processor.force_flush();
526 assert!(flush_res.is_ok());
527 let _shutdown_result = processor.shutdown();
528
529 assert!(
530 tokio::time::timeout(Duration::from_secs(5), handle)
531 .await
532 .is_ok(),
533 "timed out in 5 seconds. force_flush may not export any data when called"
534 );
535 }
536
537 async fn timeout_test_tokio(time_out: bool) {
540 let config = BatchConfig {
541 max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
542 scheduled_delay: Duration::from_secs(60 * 60 * 24), ..Default::default()
544 };
545 let exporter = BlockingExporter {
546 delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
547 delay_fn: tokio::time::sleep,
548 };
549 let processor =
550 BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread);
551 tokio::time::sleep(Duration::from_secs(1)).await; processor.on_end(new_test_export_span_data());
553 let flush_res = processor.force_flush();
554 if time_out {
555 assert!(flush_res.is_err());
556 } else {
557 assert!(flush_res.is_ok());
558 }
559 let shutdown_res = processor.shutdown();
560 assert!(shutdown_res.is_ok());
561 }
562
563 #[test]
564 fn test_timeout_tokio_timeout() {
565 let runtime = tokio::runtime::Builder::new_multi_thread()
569 .enable_all()
570 .build()
571 .unwrap();
572 runtime.block_on(timeout_test_tokio(true));
573 }
574
575 #[test]
576 fn test_timeout_tokio_not_timeout() {
577 let runtime = tokio::runtime::Builder::new_multi_thread()
578 .enable_all()
579 .build()
580 .unwrap();
581 runtime.block_on(timeout_test_tokio(false));
582 }
583
584 #[test]
585 #[cfg(feature = "rt-async-std")]
586 fn test_timeout_async_std_timeout() {
587 async_std::task::block_on(timeout_test_std_async(true));
588 }
589
590 #[test]
591 #[cfg(feature = "rt-async-std")]
592 fn test_timeout_async_std_not_timeout() {
593 async_std::task::block_on(timeout_test_std_async(false));
594 }
595
596 #[cfg(feature = "rt-async-std")]
599 async fn timeout_test_std_async(time_out: bool) {
600 let config = BatchConfig {
601 max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }),
602 scheduled_delay: Duration::from_secs(60 * 60 * 24), ..Default::default()
604 };
605 let exporter = BlockingExporter {
606 delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }),
607 delay_fn: async_std::task::sleep,
608 };
609 let processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd);
610 processor.on_end(new_test_export_span_data());
611 let flush_res = processor.force_flush();
612 if time_out {
613 assert!(flush_res.is_err());
614 } else {
615 assert!(flush_res.is_ok());
616 }
617 let shutdown_res = processor.shutdown();
618 assert!(shutdown_res.is_ok());
619 }
620}