opentelemetry_sdk/logs/
log_processor_with_async_runtime.rs

1use crate::error::{OTelSdkError, OTelSdkResult};
2use crate::{
3    logs::{LogBatch, LogExporter, SdkLogRecord},
4    Resource,
5};
6
7use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};
8
9use std::env;
10use std::{
11    fmt::{self, Debug, Formatter},
12    sync::Arc,
13};
14use std::{
15    sync::atomic::{AtomicUsize, Ordering},
16    time::Duration,
17};
18
19use super::{BatchConfig, LogProcessor};
20use crate::runtime::{RuntimeChannel, TrySend};
21use futures_channel::oneshot;
22use futures_util::{
23    future::{self, Either},
24    {pin_mut, stream, StreamExt as _},
25};
26
27#[allow(clippy::large_enum_variant)]
28#[derive(Debug)]
29enum BatchMessage {
30    /// Export logs, usually called when the log is emitted.
31    ExportLog((SdkLogRecord, InstrumentationScope)),
32    /// Flush the current buffer to the backend, it can be triggered by
33    /// pre configured interval or a call to `force_push` function.
34    Flush(Option<oneshot::Sender<OTelSdkResult>>),
35    /// Shut down the worker thread, push all logs in buffer to the backend.
36    Shutdown(oneshot::Sender<OTelSdkResult>),
37    /// Set the resource for the exporter.
38    SetResource(Arc<Resource>),
39}
40
41/// A [`LogProcessor`] that asynchronously buffers log records and reports
42/// them at a pre-configured interval.
43pub struct BatchLogProcessor<R: RuntimeChannel> {
44    message_sender: R::Sender<BatchMessage>,
45
46    // Track dropped logs - we'll log this at shutdown
47    dropped_logs_count: AtomicUsize,
48
49    // Track the maximum queue size that was configured for this processor
50    max_queue_size: usize,
51}
52
53impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
54    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
55        f.debug_struct("BatchLogProcessor")
56            .field("message_sender", &self.message_sender)
57            .finish()
58    }
59}
60
61impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
62    fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
63        let result = self.message_sender.try_send(BatchMessage::ExportLog((
64            record.clone(),
65            instrumentation.clone(),
66        )));
67
68        // TODO - Implement throttling to prevent error flooding when the queue is full or closed.
69        if result.is_err() {
70            // Increment dropped logs count. The first time we have to drop a log,
71            // emit a warning.
72            if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
73                otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
74                    message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
75            }
76        }
77    }
78
79    fn force_flush(&self) -> OTelSdkResult {
80        let (res_sender, res_receiver) = oneshot::channel();
81        self.message_sender
82            .try_send(BatchMessage::Flush(Some(res_sender)))
83            .map_err(|err| OTelSdkError::InternalFailure(format!("{:?}", err)))?;
84
85        futures_executor::block_on(res_receiver)
86            .map_err(|err| OTelSdkError::InternalFailure(format!("{:?}", err)))
87            .and_then(std::convert::identity)
88    }
89
90    fn shutdown(&self) -> OTelSdkResult {
91        let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
92        let max_queue_size = self.max_queue_size;
93        if dropped_logs > 0 {
94            otel_warn!(
95                name: "BatchLogProcessor.LogsDropped",
96                dropped_logs_count = dropped_logs,
97                max_queue_size = max_queue_size,
98                message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
99            );
100        }
101        let (res_sender, res_receiver) = oneshot::channel();
102        self.message_sender
103            .try_send(BatchMessage::Shutdown(res_sender))
104            .map_err(|err| OTelSdkError::InternalFailure(format!("{:?}", err)))?;
105
106        futures_executor::block_on(res_receiver)
107            .map_err(|err| OTelSdkError::InternalFailure(format!("{:?}", err)))
108            .and_then(std::convert::identity)
109    }
110
111    fn set_resource(&self, resource: &Resource) {
112        let resource = Arc::new(resource.clone());
113        let _ = self
114            .message_sender
115            .try_send(BatchMessage::SetResource(resource));
116    }
117}
118
119impl<R: RuntimeChannel> BatchLogProcessor<R> {
120    pub(crate) fn new<E>(mut exporter: E, config: BatchConfig, runtime: R) -> Self
121    where
122        E: LogExporter + Send + Sync + 'static,
123    {
124        let (message_sender, message_receiver) =
125            runtime.batch_message_channel(config.max_queue_size);
126        let inner_runtime = runtime.clone();
127
128        // Spawn worker process via user-defined spawn function.
129        runtime.spawn(Box::pin(async move {
130            // Timer will take a reference to the current runtime, so its important we do this within the
131            // runtime.spawn()
132            let ticker = inner_runtime
133                .interval(config.scheduled_delay)
134                .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
135                .map(|_| BatchMessage::Flush(None));
136            let timeout_runtime = inner_runtime.clone();
137            let mut logs = Vec::new();
138            let mut messages = Box::pin(stream::select(message_receiver, ticker));
139
140            while let Some(message) = messages.next().await {
141                match message {
142                    // Log has finished, add to buffer of pending logs.
143                    BatchMessage::ExportLog(log) => {
144                        logs.push(log);
145                        if logs.len() == config.max_export_batch_size {
146                            let result = export_with_timeout(
147                                config.max_export_timeout,
148                                &mut exporter,
149                                &timeout_runtime,
150                                logs.split_off(0),
151                            )
152                            .await;
153
154                            if let Err(err) = result {
155                                otel_error!(
156                                    name: "BatchLogProcessor.Export.Error",
157                                    error = format!("{}", err)
158                                );
159                            }
160                        }
161                    }
162                    // Log batch interval time reached or a force flush has been invoked, export current logs.
163                    BatchMessage::Flush(res_channel) => {
164                        let result = export_with_timeout(
165                            config.max_export_timeout,
166                            &mut exporter,
167                            &timeout_runtime,
168                            logs.split_off(0),
169                        )
170                        .await;
171
172                        if let Some(channel) = res_channel {
173                            if let Err(send_error) = channel.send(result) {
174                                otel_debug!(
175                                    name: "BatchLogProcessor.Flush.SendResultError",
176                                    error = format!("{:?}", send_error),
177                                );
178                            }
179                        }
180                    }
181                    // Stream has terminated or processor is shutdown, return to finish execution.
182                    BatchMessage::Shutdown(ch) => {
183                        let result = export_with_timeout(
184                            config.max_export_timeout,
185                            &mut exporter,
186                            &timeout_runtime,
187                            logs.split_off(0),
188                        )
189                        .await;
190
191                        let _ = exporter.shutdown(); //TODO - handle error
192
193                        if let Err(send_error) = ch.send(result) {
194                            otel_debug!(
195                                name: "BatchLogProcessor.Shutdown.SendResultError",
196                                error = format!("{:?}", send_error),
197                            );
198                        }
199                        break;
200                    }
201                    // propagate the resource
202                    BatchMessage::SetResource(resource) => {
203                        exporter.set_resource(&resource);
204                    }
205                }
206            }
207        }));
208        // Return batch processor with link to worker
209        BatchLogProcessor {
210            message_sender,
211            dropped_logs_count: AtomicUsize::new(0),
212            max_queue_size: config.max_queue_size,
213        }
214    }
215
216    /// Create a new batch processor builder
217    pub fn builder<E>(exporter: E, runtime: R) -> BatchLogProcessorBuilder<E, R>
218    where
219        E: LogExporter,
220    {
221        BatchLogProcessorBuilder {
222            exporter,
223            config: Default::default(),
224            runtime,
225        }
226    }
227}
228
229async fn export_with_timeout<E, R>(
230    time_out: Duration,
231    exporter: &mut E,
232    runtime: &R,
233    batch: Vec<(SdkLogRecord, InstrumentationScope)>,
234) -> OTelSdkResult
235where
236    R: RuntimeChannel,
237    E: LogExporter + ?Sized,
238{
239    if batch.is_empty() {
240        return Ok(());
241    }
242
243    // TBD - Can we avoid this conversion as it involves heap allocation with new vector?
244    let log_vec: Vec<(&SdkLogRecord, &InstrumentationScope)> = batch
245        .iter()
246        .map(|log_data| (&log_data.0, &log_data.1))
247        .collect();
248    let export = exporter.export(LogBatch::new(log_vec.as_slice()));
249    let timeout = runtime.delay(time_out);
250    pin_mut!(export);
251    pin_mut!(timeout);
252    match future::select(export, timeout).await {
253        Either::Left((export_res, _)) => export_res,
254        Either::Right((_, _)) => OTelSdkResult::Err(OTelSdkError::Timeout(time_out)),
255    }
256}
257
258/// A builder for creating [`BatchLogProcessor`] instances.
259///
260#[derive(Debug)]
261pub struct BatchLogProcessorBuilder<E, R> {
262    exporter: E,
263    config: BatchConfig,
264    runtime: R,
265}
266
267impl<E, R> BatchLogProcessorBuilder<E, R>
268where
269    E: LogExporter + 'static,
270    R: RuntimeChannel,
271{
272    /// Set the BatchConfig for [`BatchLogProcessorBuilder`]
273    pub fn with_batch_config(self, config: BatchConfig) -> Self {
274        BatchLogProcessorBuilder { config, ..self }
275    }
276
277    /// Build a batch processor
278    pub fn build(self) -> BatchLogProcessor<R> {
279        BatchLogProcessor::new(self.exporter, self.config, self.runtime)
280    }
281}
282
283#[cfg(all(test, feature = "testing", feature = "logs"))]
284mod tests {
285    use crate::error::OTelSdkResult;
286    use crate::logs::log_processor::{
287        OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE,
288        OTEL_BLRP_SCHEDULE_DELAY,
289    };
290    use crate::logs::log_processor_with_async_runtime::BatchLogProcessor;
291    use crate::logs::InMemoryLogExporterBuilder;
292    use crate::logs::SdkLogRecord;
293    use crate::logs::{LogBatch, LogExporter};
294    use crate::runtime;
295    use crate::{
296        logs::{
297            log_processor::{
298                OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
299                OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
300            },
301            BatchConfig, BatchConfigBuilder, InMemoryLogExporter, LogProcessor, SdkLoggerProvider,
302            SimpleLogProcessor,
303        },
304        Resource,
305    };
306    use opentelemetry::logs::AnyValue;
307    use opentelemetry::logs::LogRecord;
308    use opentelemetry::logs::{Logger, LoggerProvider};
309    use opentelemetry::KeyValue;
310    use opentelemetry::{InstrumentationScope, Key};
311    use std::sync::{Arc, Mutex};
312    use std::time::Duration;
313
314    #[derive(Debug, Clone)]
315    struct MockLogExporter {
316        resource: Arc<Mutex<Option<Resource>>>,
317    }
318
319    impl LogExporter for MockLogExporter {
320        #[allow(clippy::manual_async_fn)]
321        fn export(
322            &self,
323            _batch: LogBatch<'_>,
324        ) -> impl std::future::Future<Output = OTelSdkResult> + Send {
325            async { Ok(()) }
326        }
327
328        fn shutdown(&mut self) -> OTelSdkResult {
329            Ok(())
330        }
331
332        fn set_resource(&mut self, resource: &Resource) {
333            self.resource
334                .lock()
335                .map(|mut res_opt| {
336                    res_opt.replace(resource.clone());
337                })
338                .expect("mock log exporter shouldn't error when setting resource");
339        }
340    }
341
342    // Implementation specific to the MockLogExporter, not part of the LogExporter trait
343    impl MockLogExporter {
344        fn get_resource(&self) -> Option<Resource> {
345            (*self.resource).lock().unwrap().clone()
346        }
347    }
348
349    #[test]
350    fn test_default_const_values() {
351        assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY");
352        assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, 1_000);
353        assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT");
354        assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, 30_000);
355        assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE");
356        assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048);
357        assert_eq!(
358            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
359            "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
360        );
361        assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
362    }
363
364    #[test]
365    fn test_default_batch_config_adheres_to_specification() {
366        // The following environment variables are expected to be unset so that their default values are used.
367        let env_vars = vec![
368            OTEL_BLRP_SCHEDULE_DELAY,
369            OTEL_BLRP_EXPORT_TIMEOUT,
370            OTEL_BLRP_MAX_QUEUE_SIZE,
371            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
372        ];
373
374        let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
375
376        assert_eq!(
377            config.scheduled_delay,
378            Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
379        );
380        assert_eq!(
381            config.max_export_timeout,
382            Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
383        );
384        assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
385        assert_eq!(
386            config.max_export_batch_size,
387            OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
388        );
389    }
390
391    #[test]
392    fn test_batch_config_configurable_by_env_vars() {
393        let env_vars = vec![
394            (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
395            (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
396            (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
397            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
398        ];
399
400        let config = temp_env::with_vars(env_vars, BatchConfig::default);
401
402        assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
403        assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
404        assert_eq!(config.max_queue_size, 4096);
405        assert_eq!(config.max_export_batch_size, 1024);
406    }
407
408    #[test]
409    fn test_batch_config_max_export_batch_size_validation() {
410        let env_vars = vec![
411            (OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")),
412            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
413        ];
414
415        let config = temp_env::with_vars(env_vars, BatchConfig::default);
416
417        assert_eq!(config.max_queue_size, 256);
418        assert_eq!(config.max_export_batch_size, 256);
419        assert_eq!(
420            config.scheduled_delay,
421            Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
422        );
423        assert_eq!(
424            config.max_export_timeout,
425            Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
426        );
427    }
428
429    #[test]
430    fn test_batch_config_with_fields() {
431        let batch = BatchConfigBuilder::default()
432            .with_max_export_batch_size(1)
433            .with_scheduled_delay(Duration::from_millis(2))
434            .with_max_export_timeout(Duration::from_millis(3))
435            .with_max_queue_size(4)
436            .build();
437
438        assert_eq!(batch.max_export_batch_size, 1);
439        assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
440        assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
441        assert_eq!(batch.max_queue_size, 4);
442    }
443
444    #[test]
445    fn test_build_batch_log_processor_builder() {
446        let mut env_vars = vec![
447            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
448            (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
449            (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
450        ];
451        temp_env::with_vars(env_vars.clone(), || {
452            let builder =
453                BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio);
454
455            assert_eq!(builder.config.max_export_batch_size, 500);
456            assert_eq!(
457                builder.config.scheduled_delay,
458                Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
459            );
460            assert_eq!(
461                builder.config.max_queue_size,
462                OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
463            );
464            assert_eq!(
465                builder.config.max_export_timeout,
466                Duration::from_millis(2046)
467            );
468        });
469
470        env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
471
472        temp_env::with_vars(env_vars, || {
473            let builder =
474                BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio);
475            assert_eq!(builder.config.max_export_batch_size, 120);
476            assert_eq!(builder.config.max_queue_size, 120);
477        });
478    }
479
480    #[test]
481    fn test_build_batch_log_processor_builder_with_custom_config() {
482        let expected = BatchConfigBuilder::default()
483            .with_max_export_batch_size(1)
484            .with_scheduled_delay(Duration::from_millis(2))
485            .with_max_export_timeout(Duration::from_millis(3))
486            .with_max_queue_size(4)
487            .build();
488
489        let builder = BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio)
490            .with_batch_config(expected);
491
492        let actual = &builder.config;
493        assert_eq!(actual.max_export_batch_size, 1);
494        assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
495        assert_eq!(actual.max_export_timeout, Duration::from_millis(3));
496        assert_eq!(actual.max_queue_size, 4);
497    }
498
499    #[test]
500    fn test_set_resource_simple_processor() {
501        let exporter = MockLogExporter {
502            resource: Arc::new(Mutex::new(None)),
503        };
504        let processor = SimpleLogProcessor::new(exporter.clone());
505        let _ = SdkLoggerProvider::builder()
506            .with_log_processor(processor)
507            .with_resource(
508                Resource::builder_empty()
509                    .with_attributes([
510                        KeyValue::new("k1", "v1"),
511                        KeyValue::new("k2", "v3"),
512                        KeyValue::new("k3", "v3"),
513                        KeyValue::new("k4", "v4"),
514                        KeyValue::new("k5", "v5"),
515                    ])
516                    .build(),
517            )
518            .build();
519        assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
520    }
521
522    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
523    async fn test_set_resource_batch_processor() {
524        let exporter = MockLogExporter {
525            resource: Arc::new(Mutex::new(None)),
526        };
527        let processor =
528            BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
529        let provider = SdkLoggerProvider::builder()
530            .with_log_processor(processor)
531            .with_resource(
532                Resource::builder_empty()
533                    .with_attributes([
534                        KeyValue::new("k1", "v1"),
535                        KeyValue::new("k2", "v3"),
536                        KeyValue::new("k3", "v3"),
537                        KeyValue::new("k4", "v4"),
538                        KeyValue::new("k5", "v5"),
539                    ])
540                    .build(),
541            )
542            .build();
543
544        // wait for the batch processor to process the resource.
545        tokio::time::sleep(Duration::from_millis(100)).await;
546
547        assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
548        let _ = provider.shutdown();
549    }
550
551    #[tokio::test(flavor = "multi_thread")]
552    async fn test_batch_shutdown() {
553        // assert we will receive an error
554        // setup
555        let exporter = InMemoryLogExporterBuilder::default()
556            .keep_records_on_shutdown()
557            .build();
558        let processor =
559            BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
560
561        let mut record = SdkLogRecord::new();
562        let instrumentation = InstrumentationScope::default();
563
564        processor.emit(&mut record, &instrumentation);
565        processor.force_flush().unwrap();
566        processor.shutdown().unwrap();
567        // todo: expect to see errors here. How should we assert this?
568        processor.emit(&mut record, &instrumentation);
569        assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
570    }
571
572    #[tokio::test(flavor = "current_thread")]
573    async fn test_batch_log_processor_shutdown_under_async_runtime_current_flavor_multi_thread() {
574        let exporter = InMemoryLogExporterBuilder::default().build();
575        let processor = BatchLogProcessor::new(
576            exporter.clone(),
577            BatchConfig::default(),
578            runtime::TokioCurrentThread,
579        );
580
581        processor.shutdown().unwrap();
582    }
583
584    #[tokio::test(flavor = "current_thread")]
585    #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
586    async fn test_batch_log_processor_with_async_runtime_shutdown_under_async_runtime_current_flavor_multi_thread(
587    ) {
588        let exporter = InMemoryLogExporterBuilder::default().build();
589        let processor = BatchLogProcessor::new(
590            exporter.clone(),
591            BatchConfig::default(),
592            runtime::TokioCurrentThread,
593        );
594
595        //
596        // deadlock happens in shutdown with tokio current_thread runtime
597        //
598        processor.shutdown().unwrap();
599    }
600
601    #[tokio::test(flavor = "current_thread")]
602    async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
603        let exporter = InMemoryLogExporterBuilder::default().build();
604        let processor = BatchLogProcessor::new(
605            exporter.clone(),
606            BatchConfig::default(),
607            runtime::TokioCurrentThread,
608        );
609        processor.shutdown().unwrap();
610    }
611
612    #[tokio::test(flavor = "multi_thread")]
613    async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() {
614        let exporter = InMemoryLogExporterBuilder::default().build();
615        let processor =
616            BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
617        processor.shutdown().unwrap();
618    }
619
620    #[tokio::test(flavor = "multi_thread")]
621    async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
622        let exporter = InMemoryLogExporterBuilder::default().build();
623        let processor =
624            BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
625        processor.shutdown().unwrap();
626    }
627
628    #[derive(Debug)]
629    struct FirstProcessor {
630        pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
631    }
632
633    impl LogProcessor for FirstProcessor {
634        fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
635            // add attribute
636            record.add_attribute(
637                Key::from_static_str("processed_by"),
638                AnyValue::String("FirstProcessor".into()),
639            );
640            // update body
641            record.body = Some("Updated by FirstProcessor".into());
642
643            self.logs
644                .lock()
645                .unwrap()
646                .push((record.clone(), instrumentation.clone())); //clone as the LogProcessor is storing the data.
647        }
648
649        fn force_flush(&self) -> OTelSdkResult {
650            Ok(())
651        }
652
653        fn shutdown(&self) -> OTelSdkResult {
654            Ok(())
655        }
656    }
657
658    #[derive(Debug)]
659    struct SecondProcessor {
660        pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
661    }
662
663    impl LogProcessor for SecondProcessor {
664        fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
665            assert!(record.attributes_contains(
666                &Key::from_static_str("processed_by"),
667                &AnyValue::String("FirstProcessor".into())
668            ));
669            assert!(
670                record.body.clone().unwrap()
671                    == AnyValue::String("Updated by FirstProcessor".into())
672            );
673            self.logs
674                .lock()
675                .unwrap()
676                .push((record.clone(), instrumentation.clone()));
677        }
678
679        fn force_flush(&self) -> OTelSdkResult {
680            Ok(())
681        }
682
683        fn shutdown(&self) -> OTelSdkResult {
684            Ok(())
685        }
686    }
687    #[test]
688    fn test_log_data_modification_by_multiple_processors() {
689        let first_processor_logs = Arc::new(Mutex::new(Vec::new()));
690        let second_processor_logs = Arc::new(Mutex::new(Vec::new()));
691
692        let first_processor = FirstProcessor {
693            logs: Arc::clone(&first_processor_logs),
694        };
695        let second_processor = SecondProcessor {
696            logs: Arc::clone(&second_processor_logs),
697        };
698
699        let logger_provider = SdkLoggerProvider::builder()
700            .with_log_processor(first_processor)
701            .with_log_processor(second_processor)
702            .build();
703
704        let logger = logger_provider.logger("test-logger");
705        let mut log_record = logger.create_log_record();
706        log_record.body = Some(AnyValue::String("Test log".into()));
707
708        logger.emit(log_record);
709
710        assert_eq!(first_processor_logs.lock().unwrap().len(), 1);
711        assert_eq!(second_processor_logs.lock().unwrap().len(), 1);
712
713        let first_log = &first_processor_logs.lock().unwrap()[0];
714        let second_log = &second_processor_logs.lock().unwrap()[0];
715
716        assert!(first_log.0.attributes_contains(
717            &Key::from_static_str("processed_by"),
718            &AnyValue::String("FirstProcessor".into())
719        ));
720        assert!(second_log.0.attributes_contains(
721            &Key::from_static_str("processed_by"),
722            &AnyValue::String("FirstProcessor".into())
723        ));
724
725        assert!(
726            first_log.0.body.clone().unwrap()
727                == AnyValue::String("Updated by FirstProcessor".into())
728        );
729        assert!(
730            second_log.0.body.clone().unwrap()
731                == AnyValue::String("Updated by FirstProcessor".into())
732        );
733    }
734
735    #[test]
736    fn test_build_batch_log_processor_builder_rt() {
737        let mut env_vars = vec![
738            (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
739            (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
740            (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
741        ];
742        temp_env::with_vars(env_vars.clone(), || {
743            let builder =
744                BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio);
745
746            assert_eq!(builder.config.max_export_batch_size, 500);
747            assert_eq!(
748                builder.config.scheduled_delay,
749                Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
750            );
751            assert_eq!(
752                builder.config.max_queue_size,
753                OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
754            );
755            assert_eq!(
756                builder.config.max_export_timeout,
757                Duration::from_millis(2046)
758            );
759        });
760
761        env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
762
763        temp_env::with_vars(env_vars, || {
764            let builder =
765                BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio);
766            assert_eq!(builder.config.max_export_batch_size, 120);
767            assert_eq!(builder.config.max_queue_size, 120);
768        });
769    }
770
771    #[test]
772    fn test_build_batch_log_processor_builder_rt_with_custom_config() {
773        let expected = BatchConfigBuilder::default()
774            .with_max_export_batch_size(1)
775            .with_scheduled_delay(Duration::from_millis(2))
776            .with_max_export_timeout(Duration::from_millis(3))
777            .with_max_queue_size(4)
778            .build();
779
780        let builder = BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio)
781            .with_batch_config(expected);
782
783        let actual = &builder.config;
784        assert_eq!(actual.max_export_batch_size, 1);
785        assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
786        assert_eq!(actual.max_export_timeout, Duration::from_millis(3));
787        assert_eq!(actual.max_queue_size, 4);
788    }
789
790    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
791    async fn test_set_resource_batch_processor_rt() {
792        let exporter = MockLogExporter {
793            resource: Arc::new(Mutex::new(None)),
794        };
795        let processor =
796            BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
797        let provider = SdkLoggerProvider::builder()
798            .with_log_processor(processor)
799            .with_resource(Resource::new(vec![
800                KeyValue::new("k1", "v1"),
801                KeyValue::new("k2", "v3"),
802                KeyValue::new("k3", "v3"),
803                KeyValue::new("k4", "v4"),
804                KeyValue::new("k5", "v5"),
805            ]))
806            .build();
807        tokio::time::sleep(Duration::from_millis(500)).await; // set resource in batch log processor is not blocking. Should we make it blocking?
808        assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
809        let _ = provider.shutdown();
810    }
811
812    #[tokio::test(flavor = "multi_thread")]
813    #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
814    async fn test_batch_shutdown_rt() {
815        // assert we will receive an error
816        // setup
817        let exporter = InMemoryLogExporterBuilder::default()
818            .keep_records_on_shutdown()
819            .build();
820        let processor =
821            BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
822
823        let mut record = SdkLogRecord::new();
824        let instrumentation = InstrumentationScope::default();
825
826        processor.emit(&mut record, &instrumentation);
827        processor.force_flush().unwrap();
828        processor.shutdown().unwrap();
829        // todo: expect to see errors here. How should we assert this?
830        processor.emit(&mut record, &instrumentation);
831        assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
832    }
833
834    #[tokio::test(flavor = "current_thread")]
835    #[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
836    async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_multi_thread() {
837        let exporter = InMemoryLogExporterBuilder::default().build();
838        let processor =
839            BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
840
841        //
842        // deadlock happens in shutdown with tokio current_thread runtime
843        //
844        processor.shutdown().unwrap();
845    }
846
847    #[tokio::test(flavor = "current_thread")]
848    async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_current_thread()
849    {
850        let exporter = InMemoryLogExporterBuilder::default().build();
851        let processor = BatchLogProcessor::new(
852            exporter.clone(),
853            BatchConfig::default(),
854            runtime::TokioCurrentThread,
855        );
856
857        processor.shutdown().unwrap();
858    }
859
860    #[tokio::test(flavor = "multi_thread")]
861    async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_multi_thread() {
862        let exporter = InMemoryLogExporterBuilder::default().build();
863        let processor =
864            BatchLogProcessor::new(exporter.clone(), BatchConfig::default(), runtime::Tokio);
865
866        processor.shutdown().unwrap();
867    }
868
869    #[tokio::test(flavor = "multi_thread")]
870    async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_current_thread() {
871        let exporter = InMemoryLogExporterBuilder::default().build();
872        let processor = BatchLogProcessor::new(
873            exporter.clone(),
874            BatchConfig::default(),
875            runtime::TokioCurrentThread,
876        );
877
878        processor.shutdown().unwrap();
879    }
880}