1use crate::error::{OTelSdkError, OTelSdkResult};
2use crate::{
3 logs::{LogBatch, LogExporter, SdkLogRecord},
4 Resource,
5};
6
7use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};
8
9use std::env;
10use std::{
11 fmt::{self, Debug, Formatter},
12 sync::Arc,
13};
14use std::{
15 sync::atomic::{AtomicUsize, Ordering},
16 time::Duration,
17};
18
19use super::{BatchConfig, LogProcessor};
20use crate::runtime::{RuntimeChannel, TrySend};
21use futures_channel::oneshot;
22use futures_util::{
23 future::{self, Either},
24 {pin_mut, stream, StreamExt as _},
25};
26
27#[allow(clippy::large_enum_variant)]
28#[derive(Debug)]
29enum BatchMessage {
30 ExportLog((SdkLogRecord, InstrumentationScope)),
32 Flush(Option<oneshot::Sender<OTelSdkResult>>),
35 Shutdown(oneshot::Sender<OTelSdkResult>),
37 SetResource(Arc<Resource>),
39}
40
41pub struct BatchLogProcessor<R: RuntimeChannel> {
44 message_sender: R::Sender<BatchMessage>,
45
46 dropped_logs_count: AtomicUsize,
48
49 max_queue_size: usize,
51}
52
53impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
54 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
55 f.debug_struct("BatchLogProcessor")
56 .field("message_sender", &self.message_sender)
57 .finish()
58 }
59}
60
61impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
62 fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
63 let result = self.message_sender.try_send(BatchMessage::ExportLog((
64 record.clone(),
65 instrumentation.clone(),
66 )));
67
68 if result.is_err() {
70 if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
73 otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
74 message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
75 }
76 }
77 }
78
79 fn force_flush(&self) -> OTelSdkResult {
80 let (res_sender, res_receiver) = oneshot::channel();
81 self.message_sender
82 .try_send(BatchMessage::Flush(Some(res_sender)))
83 .map_err(|err| OTelSdkError::InternalFailure(format!("{:?}", err)))?;
84
85 futures_executor::block_on(res_receiver)
86 .map_err(|err| OTelSdkError::InternalFailure(format!("{:?}", err)))
87 .and_then(std::convert::identity)
88 }
89
90 fn shutdown(&self) -> OTelSdkResult {
91 let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
92 let max_queue_size = self.max_queue_size;
93 if dropped_logs > 0 {
94 otel_warn!(
95 name: "BatchLogProcessor.LogsDropped",
96 dropped_logs_count = dropped_logs,
97 max_queue_size = max_queue_size,
98 message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
99 );
100 }
101 let (res_sender, res_receiver) = oneshot::channel();
102 self.message_sender
103 .try_send(BatchMessage::Shutdown(res_sender))
104 .map_err(|err| OTelSdkError::InternalFailure(format!("{:?}", err)))?;
105
106 futures_executor::block_on(res_receiver)
107 .map_err(|err| OTelSdkError::InternalFailure(format!("{:?}", err)))
108 .and_then(std::convert::identity)
109 }
110
111 fn set_resource(&self, resource: &Resource) {
112 let resource = Arc::new(resource.clone());
113 let _ = self
114 .message_sender
115 .try_send(BatchMessage::SetResource(resource));
116 }
117}
118
119impl<R: RuntimeChannel> BatchLogProcessor<R> {
120 pub(crate) fn new<E>(mut exporter: E, config: BatchConfig, runtime: R) -> Self
121 where
122 E: LogExporter + Send + Sync + 'static,
123 {
124 let (message_sender, message_receiver) =
125 runtime.batch_message_channel(config.max_queue_size);
126 let inner_runtime = runtime.clone();
127
128 runtime.spawn(Box::pin(async move {
130 let ticker = inner_runtime
133 .interval(config.scheduled_delay)
134 .skip(1) .map(|_| BatchMessage::Flush(None));
136 let timeout_runtime = inner_runtime.clone();
137 let mut logs = Vec::new();
138 let mut messages = Box::pin(stream::select(message_receiver, ticker));
139
140 while let Some(message) = messages.next().await {
141 match message {
142 BatchMessage::ExportLog(log) => {
144 logs.push(log);
145 if logs.len() == config.max_export_batch_size {
146 let result = export_with_timeout(
147 config.max_export_timeout,
148 &mut exporter,
149 &timeout_runtime,
150 logs.split_off(0),
151 )
152 .await;
153
154 if let Err(err) = result {
155 otel_error!(
156 name: "BatchLogProcessor.Export.Error",
157 error = format!("{}", err)
158 );
159 }
160 }
161 }
162 BatchMessage::Flush(res_channel) => {
164 let result = export_with_timeout(
165 config.max_export_timeout,
166 &mut exporter,
167 &timeout_runtime,
168 logs.split_off(0),
169 )
170 .await;
171
172 if let Some(channel) = res_channel {
173 if let Err(send_error) = channel.send(result) {
174 otel_debug!(
175 name: "BatchLogProcessor.Flush.SendResultError",
176 error = format!("{:?}", send_error),
177 );
178 }
179 }
180 }
181 BatchMessage::Shutdown(ch) => {
183 let result = export_with_timeout(
184 config.max_export_timeout,
185 &mut exporter,
186 &timeout_runtime,
187 logs.split_off(0),
188 )
189 .await;
190
191 let _ = exporter.shutdown(); if let Err(send_error) = ch.send(result) {
194 otel_debug!(
195 name: "BatchLogProcessor.Shutdown.SendResultError",
196 error = format!("{:?}", send_error),
197 );
198 }
199 break;
200 }
201 BatchMessage::SetResource(resource) => {
203 exporter.set_resource(&resource);
204 }
205 }
206 }
207 }));
208 BatchLogProcessor {
210 message_sender,
211 dropped_logs_count: AtomicUsize::new(0),
212 max_queue_size: config.max_queue_size,
213 }
214 }
215
216 pub fn builder<E>(exporter: E, runtime: R) -> BatchLogProcessorBuilder<E, R>
218 where
219 E: LogExporter,
220 {
221 BatchLogProcessorBuilder {
222 exporter,
223 config: Default::default(),
224 runtime,
225 }
226 }
227}
228
229async fn export_with_timeout<E, R>(
230 time_out: Duration,
231 exporter: &mut E,
232 runtime: &R,
233 batch: Vec<(SdkLogRecord, InstrumentationScope)>,
234) -> OTelSdkResult
235where
236 R: RuntimeChannel,
237 E: LogExporter + ?Sized,
238{
239 if batch.is_empty() {
240 return Ok(());
241 }
242
243 let log_vec: Vec<(&SdkLogRecord, &InstrumentationScope)> = batch
245 .iter()
246 .map(|log_data| (&log_data.0, &log_data.1))
247 .collect();
248 let export = exporter.export(LogBatch::new(log_vec.as_slice()));
249 let timeout = runtime.delay(time_out);
250 pin_mut!(export);
251 pin_mut!(timeout);
252 match future::select(export, timeout).await {
253 Either::Left((export_res, _)) => export_res,
254 Either::Right((_, _)) => OTelSdkResult::Err(OTelSdkError::Timeout(time_out)),
255 }
256}
257
258#[derive(Debug)]
261pub struct BatchLogProcessorBuilder<E, R> {
262 exporter: E,
263 config: BatchConfig,
264 runtime: R,
265}
266
267impl<E, R> BatchLogProcessorBuilder<E, R>
268where
269 E: LogExporter + 'static,
270 R: RuntimeChannel,
271{
272 pub fn with_batch_config(self, config: BatchConfig) -> Self {
274 BatchLogProcessorBuilder { config, ..self }
275 }
276
277 pub fn build(self) -> BatchLogProcessor<R> {
279 BatchLogProcessor::new(self.exporter, self.config, self.runtime)
280 }
281}
282
283#[cfg(all(test, feature = "testing", feature = "logs"))]
284mod tests {
285 use crate::error::OTelSdkResult;
286 use crate::logs::log_processor::{
287 OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE,
288 OTEL_BLRP_SCHEDULE_DELAY,
289 };
290 use crate::logs::log_processor_with_async_runtime::BatchLogProcessor;
291 use crate::logs::InMemoryLogExporterBuilder;
292 use crate::logs::SdkLogRecord;
293 use crate::logs::{LogBatch, LogExporter};
294 use crate::runtime;
295 use crate::{
296 logs::{
297 log_processor::{
298 OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
299 OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
300 },
301 BatchConfig, BatchConfigBuilder, InMemoryLogExporter, LogProcessor, SdkLoggerProvider,
302 SimpleLogProcessor,
303 },
304 Resource,
305 };
306 use opentelemetry::logs::AnyValue;
307 use opentelemetry::logs::LogRecord;
308 use opentelemetry::logs::{Logger, LoggerProvider};
309 use opentelemetry::KeyValue;
310 use opentelemetry::{InstrumentationScope, Key};
311 use std::sync::{Arc, Mutex};
312 use std::time::Duration;
313
314 #[derive(Debug, Clone)]
315 struct MockLogExporter {
316 resource: Arc<Mutex<Option<Resource>>>,
317 }
318
319 impl LogExporter for MockLogExporter {
320 #[allow(clippy::manual_async_fn)]
321 fn export(
322 &self,
323 _batch: LogBatch<'_>,
324 ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
325 async { Ok(()) }
326 }
327
328 fn shutdown(&mut self) -> OTelSdkResult {
329 Ok(())
330 }
331
332 fn set_resource(&mut self, resource: &Resource) {
333 self.resource
334 .lock()
335 .map(|mut res_opt| {
336 res_opt.replace(resource.clone());
337 })
338 .expect("mock log exporter shouldn't error when setting resource");
339 }
340 }
341
342 impl MockLogExporter {
344 fn get_resource(&self) -> Option<Resource> {
345 (*self.resource).lock().unwrap().clone()
346 }
347 }
348
349 #[test]
350 fn test_default_const_values() {
351 assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY");
352 assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, 1_000);
353 assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT");
354 assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, 30_000);
355 assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE");
356 assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048);
357 assert_eq!(
358 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
359 "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
360 );
361 assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
362 }
363
364 #[test]
365 fn test_default_batch_config_adheres_to_specification() {
366 let env_vars = vec![
368 OTEL_BLRP_SCHEDULE_DELAY,
369 OTEL_BLRP_EXPORT_TIMEOUT,
370 OTEL_BLRP_MAX_QUEUE_SIZE,
371 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
372 ];
373
374 let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
375
376 assert_eq!(
377 config.scheduled_delay,
378 Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
379 );
380 assert_eq!(
381 config.max_export_timeout,
382 Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
383 );
384 assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
385 assert_eq!(
386 config.max_export_batch_size,
387 OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
388 );
389 }
390
391 #[test]
392 fn test_batch_config_configurable_by_env_vars() {
393 let env_vars = vec![
394 (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
395 (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
396 (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
397 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
398 ];
399
400 let config = temp_env::with_vars(env_vars, BatchConfig::default);
401
402 assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
403 assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
404 assert_eq!(config.max_queue_size, 4096);
405 assert_eq!(config.max_export_batch_size, 1024);
406 }
407
408 #[test]
409 fn test_batch_config_max_export_batch_size_validation() {
410 let env_vars = vec![
411 (OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")),
412 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
413 ];
414
415 let config = temp_env::with_vars(env_vars, BatchConfig::default);
416
417 assert_eq!(config.max_queue_size, 256);
418 assert_eq!(config.max_export_batch_size, 256);
419 assert_eq!(
420 config.scheduled_delay,
421 Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
422 );
423 assert_eq!(
424 config.max_export_timeout,
425 Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
426 );
427 }
428
429 #[test]
430 fn test_batch_config_with_fields() {
431 let batch = BatchConfigBuilder::default()
432 .with_max_export_batch_size(1)
433 .with_scheduled_delay(Duration::from_millis(2))
434 .with_max_export_timeout(Duration::from_millis(3))
435 .with_max_queue_size(4)
436 .build();
437
438 assert_eq!(batch.max_export_batch_size, 1);
439 assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
440 assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
441 assert_eq!(batch.max_queue_size, 4);
442 }
443
444 #[test]
445 fn test_build_batch_log_processor_builder() {
446 let mut env_vars = vec![
447 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
448 (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
449 (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
450 ];
451 temp_env::with_vars(env_vars.clone(), || {
452 let builder =
453 BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio);
454
455 assert_eq!(builder.config.max_export_batch_size, 500);
456 assert_eq!(
457 builder.config.scheduled_delay,
458 Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
459 );
460 assert_eq!(
461 builder.config.max_queue_size,
462 OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
463 );
464 assert_eq!(
465 builder.config.max_export_timeout,
466 Duration::from_millis(2046)
467 );
468 });
469
470 env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
471
472 temp_env::with_vars(env_vars, || {
473 let builder =
474 BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio);
475 assert_eq!(builder.config.max_export_batch_size, 120);
476 assert_eq!(builder.config.max_queue_size, 120);
477 });
478 }
479
480 #[test]
481 fn test_build_batch_log_processor_builder_with_custom_config() {
482 let expected = BatchConfigBuilder::default()
483 .with_max_export_batch_size(1)
484 .with_scheduled_delay(Duration::from_millis(2))
485 .with_max_export_timeout(Duration::from_millis(3))
486 .with_max_queue_size(4)
487 .build();
488
489 let builder = BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio)
490 .with_batch_config(expected);
491
492 let actual = &builder.config;
493 assert_eq!(actual.max_export_batch_size, 1);
494 assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
495 assert_eq!(actual.max_export_timeout, Duration::from_millis(3));
496 assert_eq!(actual.max_queue_size, 4);
497 }
498
499 #[test]
500 fn test_set_resource_simple_processor() {
501 let exporter = MockLogExporter {
502 resource: Arc::new(Mutex::new(None)),
503 };
504 let processor = SimpleLogProcessor::new(exporter.clone());
505 let _ = SdkLoggerProvider::builder()
506 .with_log_processor(processor)
507 .with_resource(
508 Resource::builder_empty()
509 .with_attributes([
510 KeyValue::new("k1", "v1"),
511 KeyValue::new("k2", "v3"),
512 KeyValue::new("k3", "v3"),
513 KeyValue::new("k4", "v4"),
514 KeyValue::new("k5", "v5"),
515 ])
516 .build(),
517 )
518 .build();
519 assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
520 }
521
522 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
523 async fn test_set_resource_batch_processor() {
524 let exporter = MockLogExporter {
525 resource: Arc::new(Mutex::new(None)),
526 };
527 let processor =
528 BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
529 let provider = SdkLoggerProvider::builder()
530 .with_log_processor(processor)
531 .with_resource(
532 Resource::builder_empty()
533 .with_attributes([
534 KeyValue::new("k1", "v1"),
535 KeyValue::new("k2", "v3"),
536 KeyValue::new("k3", "v3"),
537 KeyValue::new("k4", "v4"),
538 KeyValue::new("k5", "v5"),
539 ])
540 .build(),
541 )
542 .build();
543
544 tokio::time::sleep(Duration::from_millis(100)).await;
546
547 assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
548 let _ = provider.shutdown();
549 }
550
551 #[tokio::test(flavor = "multi_thread")]
552 async fn test_batch_shutdown() {
553 let exporter = InMemoryLogExporterBuilder::default()
556 .keep_records_on_shutdown()
557 .build();
558 let processor =
559 BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
560
561 let mut record = SdkLogRecord::new();
562 let instrumentation = InstrumentationScope::default();
563
564 processor.emit(&mut record, &instrumentation);
565 processor.force_flush().unwrap();
566 processor.shutdown().unwrap();
567 processor.emit(&mut record, &instrumentation);
569 assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
570 }
571
572 #[tokio::test(flavor = "current_thread")]
573 async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() {
574 let exporter = InMemoryLogExporterBuilder::default().build();
575 let processor = BatchLogProcessor::new(
576 exporter.clone(),
577 BatchConfig::default(),
578 runtime::TokioCurrentThread,
579 );
580
581 processor.shutdown().unwrap();
582 }
583
584 #[tokio::test(flavor = "current_thread")]
585 #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
586 async fn test_batch_log_processor_with_async_runtime_shutdown_under_async_runtime_current_flavor_multi_thread(
587 ) {
588 let exporter = InMemoryLogExporterBuilder::default().build();
589 let processor = BatchLogProcessor::new(
590 exporter.clone(),
591 BatchConfig::default(),
592 runtime::TokioCurrentThread,
593 );
594
595 processor.shutdown().unwrap();
599 }
600
601 #[tokio::test(flavor = "current_thread")]
602 async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
603 let exporter = InMemoryLogExporterBuilder::default().build();
604 let processor = BatchLogProcessor::new(
605 exporter.clone(),
606 BatchConfig::default(),
607 runtime::TokioCurrentThread,
608 );
609 processor.shutdown().unwrap();
610 }
611
612 #[tokio::test(flavor = "multi_thread")]
613 async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() {
614 let exporter = InMemoryLogExporterBuilder::default().build();
615 let processor =
616 BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
617 processor.shutdown().unwrap();
618 }
619
620 #[tokio::test(flavor = "multi_thread")]
621 async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
622 let exporter = InMemoryLogExporterBuilder::default().build();
623 let processor =
624 BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
625 processor.shutdown().unwrap();
626 }
627
628 #[derive(Debug)]
629 struct FirstProcessor {
630 pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
631 }
632
633 impl LogProcessor for FirstProcessor {
634 fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
635 record.add_attribute(
637 Key::from_static_str("processed_by"),
638 AnyValue::String("FirstProcessor".into()),
639 );
640 record.body = Some("Updated by FirstProcessor".into());
642
643 self.logs
644 .lock()
645 .unwrap()
646 .push((record.clone(), instrumentation.clone())); }
648
649 fn force_flush(&self) -> OTelSdkResult {
650 Ok(())
651 }
652
653 fn shutdown(&self) -> OTelSdkResult {
654 Ok(())
655 }
656 }
657
658 #[derive(Debug)]
659 struct SecondProcessor {
660 pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
661 }
662
663 impl LogProcessor for SecondProcessor {
664 fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
665 assert!(record.attributes_contains(
666 &Key::from_static_str("processed_by"),
667 &AnyValue::String("FirstProcessor".into())
668 ));
669 assert!(
670 record.body.clone().unwrap()
671 == AnyValue::String("Updated by FirstProcessor".into())
672 );
673 self.logs
674 .lock()
675 .unwrap()
676 .push((record.clone(), instrumentation.clone()));
677 }
678
679 fn force_flush(&self) -> OTelSdkResult {
680 Ok(())
681 }
682
683 fn shutdown(&self) -> OTelSdkResult {
684 Ok(())
685 }
686 }
687 #[test]
688 fn test_log_data_modification_by_multiple_processors() {
689 let first_processor_logs = Arc::new(Mutex::new(Vec::new()));
690 let second_processor_logs = Arc::new(Mutex::new(Vec::new()));
691
692 let first_processor = FirstProcessor {
693 logs: Arc::clone(&first_processor_logs),
694 };
695 let second_processor = SecondProcessor {
696 logs: Arc::clone(&second_processor_logs),
697 };
698
699 let logger_provider = SdkLoggerProvider::builder()
700 .with_log_processor(first_processor)
701 .with_log_processor(second_processor)
702 .build();
703
704 let logger = logger_provider.logger("test-logger");
705 let mut log_record = logger.create_log_record();
706 log_record.body = Some(AnyValue::String("Test log".into()));
707
708 logger.emit(log_record);
709
710 assert_eq!(first_processor_logs.lock().unwrap().len(), 1);
711 assert_eq!(second_processor_logs.lock().unwrap().len(), 1);
712
713 let first_log = &first_processor_logs.lock().unwrap()[0];
714 let second_log = &second_processor_logs.lock().unwrap()[0];
715
716 assert!(first_log.0.attributes_contains(
717 &Key::from_static_str("processed_by"),
718 &AnyValue::String("FirstProcessor".into())
719 ));
720 assert!(second_log.0.attributes_contains(
721 &Key::from_static_str("processed_by"),
722 &AnyValue::String("FirstProcessor".into())
723 ));
724
725 assert!(
726 first_log.0.body.clone().unwrap()
727 == AnyValue::String("Updated by FirstProcessor".into())
728 );
729 assert!(
730 second_log.0.body.clone().unwrap()
731 == AnyValue::String("Updated by FirstProcessor".into())
732 );
733 }
734
735 #[test]
736 fn test_build_batch_log_processor_builder_rt() {
737 let mut env_vars = vec![
738 (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
739 (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
740 (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
741 ];
742 temp_env::with_vars(env_vars.clone(), || {
743 let builder =
744 BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio);
745
746 assert_eq!(builder.config.max_export_batch_size, 500);
747 assert_eq!(
748 builder.config.scheduled_delay,
749 Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
750 );
751 assert_eq!(
752 builder.config.max_queue_size,
753 OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
754 );
755 assert_eq!(
756 builder.config.max_export_timeout,
757 Duration::from_millis(2046)
758 );
759 });
760
761 env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
762
763 temp_env::with_vars(env_vars, || {
764 let builder =
765 BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio);
766 assert_eq!(builder.config.max_export_batch_size, 120);
767 assert_eq!(builder.config.max_queue_size, 120);
768 });
769 }
770
771 #[test]
772 fn test_build_batch_log_processor_builder_rt_with_custom_config() {
773 let expected = BatchConfigBuilder::default()
774 .with_max_export_batch_size(1)
775 .with_scheduled_delay(Duration::from_millis(2))
776 .with_max_export_timeout(Duration::from_millis(3))
777 .with_max_queue_size(4)
778 .build();
779
780 let builder = BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio)
781 .with_batch_config(expected);
782
783 let actual = &builder.config;
784 assert_eq!(actual.max_export_batch_size, 1);
785 assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
786 assert_eq!(actual.max_export_timeout, Duration::from_millis(3));
787 assert_eq!(actual.max_queue_size, 4);
788 }
789
790 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
791 async fn test_set_resource_batch_processor_rt() {
792 let exporter = MockLogExporter {
793 resource: Arc::new(Mutex::new(None)),
794 };
795 let processor =
796 BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
797 let provider = SdkLoggerProvider::builder()
798 .with_log_processor(processor)
799 .with_resource(Resource::new(vec![
800 KeyValue::new("k1", "v1"),
801 KeyValue::new("k2", "v3"),
802 KeyValue::new("k3", "v3"),
803 KeyValue::new("k4", "v4"),
804 KeyValue::new("k5", "v5"),
805 ]))
806 .build();
807 tokio::time::sleep(Duration::from_millis(500)).await; assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
809 let _ = provider.shutdown();
810 }
811
812 #[tokio::test(flavor = "multi_thread")]
813 #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
814 async fn test_batch_shutdown_rt() {
815 let exporter = InMemoryLogExporterBuilder::default()
818 .keep_records_on_shutdown()
819 .build();
820 let processor =
821 BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
822
823 let mut record = SdkLogRecord::new();
824 let instrumentation = InstrumentationScope::default();
825
826 processor.emit(&mut record, &instrumentation);
827 processor.force_flush().unwrap();
828 processor.shutdown().unwrap();
829 processor.emit(&mut record, &instrumentation);
831 assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
832 }
833
834 #[tokio::test(flavor = "current_thread")]
835 #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
836 async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_multi_thread() {
837 let exporter = InMemoryLogExporterBuilder::default().build();
838 let processor =
839 BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
840
841 processor.shutdown().unwrap();
845 }
846
847 #[tokio::test(flavor = "current_thread")]
848 async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_current_thread()
849 {
850 let exporter = InMemoryLogExporterBuilder::default().build();
851 let processor = BatchLogProcessor::new(
852 exporter.clone(),
853 BatchConfig::default(),
854 runtime::TokioCurrentThread,
855 );
856
857 processor.shutdown().unwrap();
858 }
859
860 #[tokio::test(flavor = "multi_thread")]
861 async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_multi_thread() {
862 let exporter = InMemoryLogExporterBuilder::default().build();
863 let processor =
864 BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
865
866 processor.shutdown().unwrap();
867 }
868
869 #[tokio::test(flavor = "multi_thread")]
870 async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_current_thread() {
871 let exporter = InMemoryLogExporterBuilder::default().build();
872 let processor = BatchLogProcessor::new(
873 exporter.clone(),
874 BatchConfig::default(),
875 runtime::TokioCurrentThread,
876 );
877
878 processor.shutdown().unwrap();
879 }
880}