use crate::{
export::logs::{ExportResult, LogBatch, LogExporter},
logs::{LogError, LogRecord, LogResult},
runtime::{RuntimeChannel, TrySend},
Resource,
};
use futures_channel::oneshot;
use futures_util::{
future::{self, Either},
{pin_mut, stream, StreamExt as _},
};
#[cfg(feature = "spec_unstable_logs_enabled")]
use opentelemetry::logs::Severity;
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cmp::min, env, sync::Mutex};
use std::{
fmt::{self, Debug, Formatter},
str::FromStr,
sync::Arc,
time::Duration,
};
const OTEL_BLRP_SCHEDULE_DELAY: &str = "OTEL_BLRP_SCHEDULE_DELAY";
const OTEL_BLRP_SCHEDULE_DELAY_DEFAULT: u64 = 1_000;
const OTEL_BLRP_EXPORT_TIMEOUT: &str = "OTEL_BLRP_EXPORT_TIMEOUT";
const OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
const OTEL_BLRP_MAX_QUEUE_SIZE: &str = "OTEL_BLRP_MAX_QUEUE_SIZE";
const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
pub trait LogProcessor: Send + Sync + Debug {
fn emit(&self, data: &mut LogRecord, instrumentation: &InstrumentationScope);
fn force_flush(&self) -> LogResult<()>;
fn shutdown(&self) -> LogResult<()>;
#[cfg(feature = "spec_unstable_logs_enabled")]
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
true
}
fn set_resource(&self, _resource: &Resource) {}
}
#[derive(Debug)]
pub struct SimpleLogProcessor {
exporter: Mutex<Box<dyn LogExporter>>,
is_shutdown: AtomicBool,
}
impl SimpleLogProcessor {
pub(crate) fn new(exporter: Box<dyn LogExporter>) -> Self {
SimpleLogProcessor {
exporter: Mutex::new(exporter),
is_shutdown: AtomicBool::new(false),
}
}
}
impl LogProcessor for SimpleLogProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) {
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
otel_warn!(
name: "SimpleLogProcessor.Emit.ProcessorShutdown",
);
return;
}
let result = self
.exporter
.lock()
.map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into()))
.and_then(|mut exporter| {
let log_tuple = &[(record as &LogRecord, instrumentation)];
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
});
match result {
Err(LogError::MutexPoisoned(_)) => {
otel_debug!(
name: "SimpleLogProcessor.Emit.MutexPoisoning",
);
}
Err(err) => {
otel_error!(
name: "SimpleLogProcessor.Emit.ExportError",
error = format!("{}",err)
);
}
_ => {}
}
}
fn force_flush(&self) -> LogResult<()> {
Ok(())
}
fn shutdown(&self) -> LogResult<()> {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
} else {
Err(LogError::MutexPoisoned("SimpleLogProcessor".into()))
}
}
fn set_resource(&self, resource: &Resource) {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.set_resource(resource);
}
}
}
pub struct BatchLogProcessor<R: RuntimeChannel> {
message_sender: R::Sender<BatchMessage>,
dropped_logs_count: AtomicUsize,
max_queue_size: usize,
}
impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("BatchLogProcessor")
.field("message_sender", &self.message_sender)
.finish()
}
}
impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) {
let result = self.message_sender.try_send(BatchMessage::ExportLog((
record.clone(),
instrumentation.clone(),
)));
if result.is_err() {
if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
}
}
}
fn force_flush(&self) -> LogResult<()> {
let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Flush(Some(res_sender)))
.map_err(|err| LogError::Other(err.into()))?;
futures_executor::block_on(res_receiver)
.map_err(|err| LogError::Other(err.into()))
.and_then(std::convert::identity)
}
fn shutdown(&self) -> LogResult<()> {
let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_logs > 0 {
otel_warn!(
name: "BatchLogProcessor.LogsDropped",
dropped_logs_count = dropped_logs,
max_queue_size = max_queue_size,
message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
);
}
let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Shutdown(res_sender))
.map_err(|err| LogError::Other(err.into()))?;
futures_executor::block_on(res_receiver)
.map_err(|err| LogError::Other(err.into()))
.and_then(std::convert::identity)
}
fn set_resource(&self, resource: &Resource) {
let resource = Arc::new(resource.clone());
let _ = self
.message_sender
.try_send(BatchMessage::SetResource(resource));
}
}
impl<R: RuntimeChannel> BatchLogProcessor<R> {
pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig, runtime: R) -> Self {
let (message_sender, message_receiver) =
runtime.batch_message_channel(config.max_queue_size);
let inner_runtime = runtime.clone();
let max_queue_size = config.max_queue_size;
runtime.spawn(Box::pin(async move {
let ticker = inner_runtime
.interval(config.scheduled_delay)
.skip(1) .map(|_| BatchMessage::Flush(None));
let timeout_runtime = inner_runtime.clone();
let mut logs = Vec::new();
let mut messages = Box::pin(stream::select(message_receiver, ticker));
while let Some(message) = messages.next().await {
match message {
BatchMessage::ExportLog(log) => {
logs.push(log);
if logs.len() == config.max_export_batch_size {
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
logs.split_off(0),
)
.await;
if let Err(err) = result {
otel_error!(
name: "BatchLogProcessor.Export.Error",
error = format!("{}", err)
);
}
}
}
BatchMessage::Flush(res_channel) => {
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
logs.split_off(0),
)
.await;
if let Some(channel) = res_channel {
if let Err(send_error) = channel.send(result) {
otel_debug!(
name: "BatchLogProcessor.Flush.SendResultError",
error = format!("{:?}", send_error),
);
}
}
}
BatchMessage::Shutdown(ch) => {
let result = export_with_timeout(
config.max_export_timeout,
exporter.as_mut(),
&timeout_runtime,
logs.split_off(0),
)
.await;
exporter.shutdown();
if let Err(send_error) = ch.send(result) {
otel_debug!(
name: "BatchLogProcessor.Shutdown.SendResultError",
error = format!("{:?}", send_error),
);
}
break;
}
BatchMessage::SetResource(resource) => {
exporter.set_resource(&resource);
}
}
}
}));
BatchLogProcessor {
message_sender,
dropped_logs_count: AtomicUsize::new(0),
max_queue_size,
}
}
pub fn builder<E>(exporter: E, runtime: R) -> BatchLogProcessorBuilder<E, R>
where
E: LogExporter,
{
BatchLogProcessorBuilder {
exporter,
config: Default::default(),
runtime,
}
}
}
async fn export_with_timeout<R, E>(
time_out: Duration,
exporter: &mut E,
runtime: &R,
batch: Vec<(LogRecord, InstrumentationScope)>,
) -> ExportResult
where
R: RuntimeChannel,
E: LogExporter + ?Sized,
{
if batch.is_empty() {
return Ok(());
}
let log_vec: Vec<(&LogRecord, &InstrumentationScope)> = batch
.iter()
.map(|log_data| (&log_data.0, &log_data.1))
.collect();
let export = exporter.export(LogBatch::new(log_vec.as_slice()));
let timeout = runtime.delay(time_out);
pin_mut!(export);
pin_mut!(timeout);
match future::select(export, timeout).await {
Either::Left((export_res, _)) => export_res,
Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)),
}
}
#[derive(Debug)]
pub struct BatchConfig {
max_queue_size: usize,
scheduled_delay: Duration,
max_export_batch_size: usize,
max_export_timeout: Duration,
}
impl Default for BatchConfig {
fn default() -> Self {
BatchConfigBuilder::default().build()
}
}
#[derive(Debug)]
pub struct BatchConfigBuilder {
max_queue_size: usize,
scheduled_delay: Duration,
max_export_batch_size: usize,
max_export_timeout: Duration,
}
impl Default for BatchConfigBuilder {
fn default() -> Self {
BatchConfigBuilder {
max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
scheduled_delay: Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT),
max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
max_export_timeout: Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT),
}
.init_from_env_vars()
}
}
impl BatchConfigBuilder {
pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
self.max_queue_size = max_queue_size;
self
}
pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
self.scheduled_delay = scheduled_delay;
self
}
pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
self.max_export_timeout = max_export_timeout;
self
}
pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
self.max_export_batch_size = max_export_batch_size;
self
}
pub fn build(self) -> BatchConfig {
let max_export_batch_size = min(self.max_export_batch_size, self.max_queue_size);
BatchConfig {
max_queue_size: self.max_queue_size,
scheduled_delay: self.scheduled_delay,
max_export_timeout: self.max_export_timeout,
max_export_batch_size,
}
}
fn init_from_env_vars(mut self) -> Self {
if let Some(max_queue_size) = env::var(OTEL_BLRP_MAX_QUEUE_SIZE)
.ok()
.and_then(|queue_size| usize::from_str(&queue_size).ok())
{
self.max_queue_size = max_queue_size;
}
if let Some(max_export_batch_size) = env::var(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE)
.ok()
.and_then(|batch_size| usize::from_str(&batch_size).ok())
{
self.max_export_batch_size = max_export_batch_size;
}
if let Some(scheduled_delay) = env::var(OTEL_BLRP_SCHEDULE_DELAY)
.ok()
.and_then(|delay| u64::from_str(&delay).ok())
{
self.scheduled_delay = Duration::from_millis(scheduled_delay);
}
if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
.ok()
.and_then(|s| u64::from_str(&s).ok())
{
self.max_export_timeout = Duration::from_millis(max_export_timeout);
}
self
}
}
#[derive(Debug)]
pub struct BatchLogProcessorBuilder<E, R> {
exporter: E,
config: BatchConfig,
runtime: R,
}
impl<E, R> BatchLogProcessorBuilder<E, R>
where
E: LogExporter + 'static,
R: RuntimeChannel,
{
pub fn with_batch_config(self, config: BatchConfig) -> Self {
BatchLogProcessorBuilder { config, ..self }
}
pub fn build(self) -> BatchLogProcessor<R> {
BatchLogProcessor::new(Box::new(self.exporter), self.config, self.runtime)
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum BatchMessage {
ExportLog((LogRecord, InstrumentationScope)),
Flush(Option<oneshot::Sender<ExportResult>>),
Shutdown(oneshot::Sender<ExportResult>),
SetResource(Arc<Resource>),
}
#[cfg(all(test, feature = "testing", feature = "logs"))]
mod tests {
use super::{
BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY,
};
use crate::export::logs::{LogBatch, LogExporter};
use crate::logs::LogRecord;
use crate::logs::LogResult;
use crate::testing::logs::InMemoryLogExporterBuilder;
use crate::{
logs::{
log_processor::{
OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
},
BatchConfig, BatchConfigBuilder, LogProcessor, LoggerProvider, SimpleLogProcessor,
},
runtime,
testing::logs::InMemoryLogExporter,
Resource,
};
use async_trait::async_trait;
use opentelemetry::logs::AnyValue;
use opentelemetry::logs::LogRecord as _;
use opentelemetry::logs::{Logger, LoggerProvider as _};
use opentelemetry::KeyValue;
use opentelemetry::{InstrumentationScope, Key};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Debug, Clone)]
struct MockLogExporter {
resource: Arc<Mutex<Option<Resource>>>,
}
#[async_trait]
impl LogExporter for MockLogExporter {
async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> {
Ok(())
}
fn shutdown(&mut self) {}
fn set_resource(&mut self, resource: &Resource) {
self.resource
.lock()
.map(|mut res_opt| {
res_opt.replace(resource.clone());
})
.expect("mock log exporter shouldn't error when setting resource");
}
}
impl MockLogExporter {
fn get_resource(&self) -> Option<Resource> {
(*self.resource).lock().unwrap().clone()
}
}
#[test]
fn test_default_const_values() {
assert_eq!(OTEL_BLRP_SCHEDULE_DELAY, "OTEL_BLRP_SCHEDULE_DELAY");
assert_eq!(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, 1_000);
assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT, "OTEL_BLRP_EXPORT_TIMEOUT");
assert_eq!(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, 30_000);
assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE, "OTEL_BLRP_MAX_QUEUE_SIZE");
assert_eq!(OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, 2_048);
assert_eq!(
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
"OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
);
assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
}
#[test]
fn test_default_batch_config_adheres_to_specification() {
let env_vars = vec![
OTEL_BLRP_SCHEDULE_DELAY,
OTEL_BLRP_EXPORT_TIMEOUT,
OTEL_BLRP_MAX_QUEUE_SIZE,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
];
let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
assert_eq!(
config.scheduled_delay,
Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
);
assert_eq!(
config.max_export_timeout,
Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
);
assert_eq!(config.max_queue_size, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT);
assert_eq!(
config.max_export_batch_size,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT
);
}
#[test]
fn test_batch_config_configurable_by_env_vars() {
let env_vars = vec![
(OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
(OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
(OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
];
let config = temp_env::with_vars(env_vars, BatchConfig::default);
assert_eq!(config.scheduled_delay, Duration::from_millis(2000));
assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
assert_eq!(config.max_queue_size, 4096);
assert_eq!(config.max_export_batch_size, 1024);
}
#[test]
fn test_batch_config_max_export_batch_size_validation() {
let env_vars = vec![
(OTEL_BLRP_MAX_QUEUE_SIZE, Some("256")),
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
];
let config = temp_env::with_vars(env_vars, BatchConfig::default);
assert_eq!(config.max_queue_size, 256);
assert_eq!(config.max_export_batch_size, 256);
assert_eq!(
config.scheduled_delay,
Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
);
assert_eq!(
config.max_export_timeout,
Duration::from_millis(OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT)
);
}
#[test]
fn test_batch_config_with_fields() {
let batch = BatchConfigBuilder::default()
.with_max_export_batch_size(1)
.with_scheduled_delay(Duration::from_millis(2))
.with_max_export_timeout(Duration::from_millis(3))
.with_max_queue_size(4)
.build();
assert_eq!(batch.max_export_batch_size, 1);
assert_eq!(batch.scheduled_delay, Duration::from_millis(2));
assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
assert_eq!(batch.max_queue_size, 4);
}
#[test]
fn test_build_batch_log_processor_builder() {
let mut env_vars = vec![
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("500")),
(OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
(OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
];
temp_env::with_vars(env_vars.clone(), || {
let builder =
BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio);
assert_eq!(builder.config.max_export_batch_size, 500);
assert_eq!(
builder.config.scheduled_delay,
Duration::from_millis(OTEL_BLRP_SCHEDULE_DELAY_DEFAULT)
);
assert_eq!(
builder.config.max_queue_size,
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
);
assert_eq!(
builder.config.max_export_timeout,
Duration::from_millis(2046)
);
});
env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
temp_env::with_vars(env_vars, || {
let builder =
BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio);
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
});
}
#[test]
fn test_build_batch_log_processor_builder_with_custom_config() {
let expected = BatchConfigBuilder::default()
.with_max_export_batch_size(1)
.with_scheduled_delay(Duration::from_millis(2))
.with_max_export_timeout(Duration::from_millis(3))
.with_max_queue_size(4)
.build();
let builder = BatchLogProcessor::builder(InMemoryLogExporter::default(), runtime::Tokio)
.with_batch_config(expected);
let actual = &builder.config;
assert_eq!(actual.max_export_batch_size, 1);
assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
assert_eq!(actual.max_export_timeout, Duration::from_millis(3));
assert_eq!(actual.max_queue_size, 4);
}
#[test]
fn test_set_resource_simple_processor() {
let exporter = MockLogExporter {
resource: Arc::new(Mutex::new(None)),
};
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let _ = LoggerProvider::builder()
.with_log_processor(processor)
.with_resource(Resource::new(vec![
KeyValue::new("k1", "v1"),
KeyValue::new("k2", "v3"),
KeyValue::new("k3", "v3"),
KeyValue::new("k4", "v4"),
KeyValue::new("k5", "v5"),
]))
.build();
assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_set_resource_batch_processor() {
let exporter = MockLogExporter {
resource: Arc::new(Mutex::new(None)),
};
let processor = BatchLogProcessor::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);
let provider = LoggerProvider::builder()
.with_log_processor(processor)
.with_resource(Resource::new(vec![
KeyValue::new("k1", "v1"),
KeyValue::new("k2", "v3"),
KeyValue::new("k3", "v3"),
KeyValue::new("k4", "v4"),
KeyValue::new("k5", "v5"),
]))
.build();
tokio::time::sleep(Duration::from_secs(2)).await; assert_eq!(exporter.get_resource().unwrap().into_iter().count(), 5);
let _ = provider.shutdown();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_shutdown() {
let exporter = InMemoryLogExporterBuilder::default()
.keep_records_on_shutdown()
.build();
let processor = BatchLogProcessor::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);
let mut record = LogRecord::default();
let instrumentation = InstrumentationScope::default();
processor.emit(&mut record, &instrumentation);
processor.force_flush().unwrap();
processor.shutdown().unwrap();
processor.emit(&mut record, &instrumentation);
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
}
#[test]
fn test_simple_shutdown() {
let exporter = InMemoryLogExporterBuilder::default()
.keep_records_on_shutdown()
.build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
processor.emit(&mut record, &instrumentation);
processor.shutdown().unwrap();
let is_shutdown = processor
.is_shutdown
.load(std::sync::atomic::Ordering::Relaxed);
assert!(is_shutdown);
processor.emit(&mut record, &instrumentation);
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
}
#[tokio::test(flavor = "current_thread")]
#[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968"]
async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_multi_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);
processor.shutdown().unwrap();
}
#[tokio::test(flavor = "current_thread")]
async fn test_batch_log_processor_shutdown_with_async_runtime_current_flavor_current_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::TokioCurrentThread,
);
processor.shutdown().unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_multi_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::Tokio,
);
processor.shutdown().unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_batch_log_processor_shutdown_with_async_runtime_multi_flavor_current_thread() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = BatchLogProcessor::new(
Box::new(exporter.clone()),
BatchConfig::default(),
runtime::TokioCurrentThread,
);
processor.shutdown().unwrap();
}
#[derive(Debug)]
struct FirstProcessor {
pub(crate) logs: Arc<Mutex<Vec<(LogRecord, InstrumentationScope)>>>,
}
impl LogProcessor for FirstProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) {
record.add_attribute(
Key::from_static_str("processed_by"),
AnyValue::String("FirstProcessor".into()),
);
record.body = Some("Updated by FirstProcessor".into());
self.logs
.lock()
.unwrap()
.push((record.clone(), instrumentation.clone())); }
fn force_flush(&self) -> LogResult<()> {
Ok(())
}
fn shutdown(&self) -> LogResult<()> {
Ok(())
}
}
#[derive(Debug)]
struct SecondProcessor {
pub(crate) logs: Arc<Mutex<Vec<(LogRecord, InstrumentationScope)>>>,
}
impl LogProcessor for SecondProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) {
assert!(record.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
));
assert!(
record.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
self.logs
.lock()
.unwrap()
.push((record.clone(), instrumentation.clone()));
}
fn force_flush(&self) -> LogResult<()> {
Ok(())
}
fn shutdown(&self) -> LogResult<()> {
Ok(())
}
}
#[test]
fn test_log_data_modification_by_multiple_processors() {
let first_processor_logs = Arc::new(Mutex::new(Vec::new()));
let second_processor_logs = Arc::new(Mutex::new(Vec::new()));
let first_processor = FirstProcessor {
logs: Arc::clone(&first_processor_logs),
};
let second_processor = SecondProcessor {
logs: Arc::clone(&second_processor_logs),
};
let logger_provider = LoggerProvider::builder()
.with_log_processor(first_processor)
.with_log_processor(second_processor)
.build();
let logger = logger_provider.logger("test-logger");
let mut log_record = logger.create_log_record();
log_record.body = Some(AnyValue::String("Test log".into()));
logger.emit(log_record);
assert_eq!(first_processor_logs.lock().unwrap().len(), 1);
assert_eq!(second_processor_logs.lock().unwrap().len(), 1);
let first_log = &first_processor_logs.lock().unwrap()[0];
let second_log = &second_processor_logs.lock().unwrap()[0];
assert!(first_log.0.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
));
assert!(second_log.0.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
));
assert!(
first_log.0.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
assert!(
second_log.0.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
}
#[test]
fn test_simple_processor_sync_exporter_without_runtime() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
processor.emit(&mut record, &instrumentation);
assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_simple_processor_sync_exporter_with_runtime() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
processor.emit(&mut record, &instrumentation);
assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_simple_processor_sync_exporter_with_multi_thread_runtime() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = Arc::new(SimpleLogProcessor::new(Box::new(exporter.clone())));
let mut handles = vec![];
for _ in 0..10 {
let processor_clone = Arc::clone(&processor);
let handle = tokio::spawn(async move {
let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
processor_clone.emit(&mut record, &instrumentation);
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
assert_eq!(exporter.get_emitted_logs().unwrap().len(), 10);
}
#[tokio::test(flavor = "current_thread")]
async fn test_simple_processor_sync_exporter_with_current_thread_runtime() {
let exporter = InMemoryLogExporterBuilder::default().build();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
processor.emit(&mut record, &instrumentation);
assert_eq!(exporter.get_emitted_logs().unwrap().len(), 1);
}
#[derive(Debug, Clone)]
struct LogExporterThatRequiresTokio {
export_count: Arc<AtomicUsize>,
}
impl LogExporterThatRequiresTokio {
fn new() -> Self {
LogExporterThatRequiresTokio {
export_count: Arc::new(AtomicUsize::new(0)),
}
}
fn len(&self) -> usize {
self.export_count.load(Ordering::Acquire)
}
}
#[async_trait::async_trait]
impl LogExporter for LogExporterThatRequiresTokio {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
tokio::time::sleep(Duration::from_millis(50)).await;
for _ in batch.iter() {
self.export_count.fetch_add(1, Ordering::Acquire);
}
Ok(())
}
}
#[test]
fn test_simple_processor_async_exporter_without_runtime() {
let result = std::panic::catch_unwind(|| {
let exporter = LogExporterThatRequiresTokio::new();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
processor.emit(&mut record, &instrumentation);
});
assert!(
result.is_err(),
"The test should fail due to missing Tokio runtime, but it did not."
);
let panic_payload = result.unwrap_err();
let panic_message = panic_payload
.downcast_ref::<String>()
.map(|s| s.as_str())
.or_else(|| panic_payload.downcast_ref::<&str>().copied())
.unwrap_or("No panic message");
assert!(
panic_message.contains("no reactor running")
|| panic_message.contains("must be called from the context of a Tokio 1.x runtime"),
"Expected panic message about missing Tokio runtime, but got: {}",
panic_message
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore]
async fn test_simple_processor_async_exporter_with_all_runtime_worker_threads_blocked() {
let exporter = LogExporterThatRequiresTokio::new();
let processor = Arc::new(SimpleLogProcessor::new(Box::new(exporter.clone())));
let concurrent_emit = 4; let mut handles = vec![];
for _ in 0..concurrent_emit {
let processor_clone = Arc::clone(&processor);
let handle = tokio::spawn(async move {
let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
processor_clone.emit(&mut record, &instrumentation);
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
assert_eq!(exporter.len(), concurrent_emit);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_simple_processor_async_exporter_with_runtime() {
let exporter = LogExporterThatRequiresTokio::new();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
processor.emit(&mut record, &instrumentation);
assert_eq!(exporter.len(), 1);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_simple_processor_async_exporter_with_multi_thread_runtime() {
let exporter = LogExporterThatRequiresTokio::new();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
processor.emit(&mut record, &instrumentation);
assert_eq!(exporter.len(), 1);
}
#[tokio::test(flavor = "current_thread")]
#[ignore]
async fn test_simple_processor_async_exporter_with_current_thread_runtime() {
let exporter = LogExporterThatRequiresTokio::new();
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();
processor.emit(&mut record, &instrumentation);
assert_eq!(exporter.len(), 1);
}
}