Skip to content

Commit

Permalink
Processor component provider (#6623)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg authored Aug 28, 2024
1 parent 05fe136 commit 8495996
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.Closeable;
import java.time.Duration;
import java.util.List;
import java.util.Map;

final class LogRecordProcessorFactory
implements Factory<
Expand Down Expand Up @@ -73,11 +74,26 @@ public LogRecordProcessor create(
closeables, SimpleLogRecordProcessor.create(logRecordExporter));
}

// TODO: add support for generic log record processors
if (!model.getAdditionalProperties().isEmpty()) {
throw new ConfigurationException(
"Unrecognized log record processor(s): "
+ model.getAdditionalProperties().keySet().stream().collect(joining(",", "[", "]")));
Map<String, Object> additionalProperties = model.getAdditionalProperties();
if (additionalProperties.size() > 1) {
throw new ConfigurationException(
"Invalid configuration - multiple log record processors set: "
+ additionalProperties.keySet().stream().collect(joining(",", "[", "]")));
}
Map.Entry<String, Object> processorKeyValue =
additionalProperties.entrySet().stream()
.findFirst()
.orElseThrow(
() ->
new IllegalStateException("Missing processor. This is a programming error."));
LogRecordProcessor logRecordProcessor =
FileConfigUtil.loadComponent(
spiHelper,
LogRecordProcessor.class,
processorKeyValue.getKey(),
processorKeyValue.getValue());
return FileConfigUtil.addAndReturn(closeables, logRecordProcessor);
} else {
throw new ConfigurationException("log processor must be set");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.Closeable;
import java.time.Duration;
import java.util.List;
import java.util.Map;

final class SpanProcessorFactory
implements Factory<
Expand Down Expand Up @@ -70,11 +71,26 @@ public SpanProcessor create(
return FileConfigUtil.addAndReturn(closeables, SimpleSpanProcessor.create(spanExporter));
}

// TODO: add support for generic span processors
if (!model.getAdditionalProperties().isEmpty()) {
throw new ConfigurationException(
"Unrecognized span processor(s): "
+ model.getAdditionalProperties().keySet().stream().collect(joining(",", "[", "]")));
Map<String, Object> additionalProperties = model.getAdditionalProperties();
if (additionalProperties.size() > 1) {
throw new ConfigurationException(
"Invalid configuration - multiple span processors set: "
+ additionalProperties.keySet().stream().collect(joining(",", "[", "]")));
}
Map.Entry<String, Object> processorKeyValue =
additionalProperties.entrySet().stream()
.findFirst()
.orElseThrow(
() ->
new IllegalStateException("Missing processor. This is a programming error."));
SpanProcessor spanProcessor =
FileConfigUtil.loadComponent(
spiHelper,
SpanProcessor.class,
processorKeyValue.getKey(),
processorKeyValue.getValue());
return FileConfigUtil.addAndReturn(closeables, spanProcessor);
} else {
throw new ConfigurationException("span processor must be set");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.internal.testing.CleanupExtension;
import io.opentelemetry.sdk.autoconfigure.internal.SpiHelper;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigurationException;
import io.opentelemetry.sdk.extension.incubator.fileconfig.component.LogRecordProcessorComponentProvider;
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.BatchLogRecordProcessor;
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.LogRecordExporter;
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.LogRecordProcessor;
Expand All @@ -23,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand Down Expand Up @@ -137,19 +139,35 @@ void create_SimpleConfigured() {
}

@Test
void create_SpiProcessor() {
List<Closeable> closeables = new ArrayList<>();

void create_SpiProcessor_Unknown() {
assertThatThrownBy(
() ->
LogRecordProcessorFactory.getInstance()
.create(
new LogRecordProcessor()
.withAdditionalProperty("test", ImmutableMap.of("key1", "value1")),
.withAdditionalProperty(
"unknown_key", ImmutableMap.of("key1", "value1")),
spiHelper,
closeables))
new ArrayList<>()))
.isInstanceOf(ConfigurationException.class)
.hasMessage("Unrecognized log record processor(s): [test]");
cleanup.addCloseables(closeables);
.hasMessage(
"No component provider detected for io.opentelemetry.sdk.logs.LogRecordProcessor with name \"unknown_key\".");
}

@Test
void create_SpiExporter_Valid() {
io.opentelemetry.sdk.logs.LogRecordProcessor logRecordProcessor =
LogRecordProcessorFactory.getInstance()
.create(
new LogRecordProcessor()
.withAdditionalProperty("test", ImmutableMap.of("key1", "value1")),
spiHelper,
new ArrayList<>());
assertThat(logRecordProcessor)
.isInstanceOf(LogRecordProcessorComponentProvider.TestLogRecordProcessor.class);
Assertions.assertThat(
((LogRecordProcessorComponentProvider.TestLogRecordProcessor) logRecordProcessor)
.config.getString("key1"))
.isEqualTo("value1");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,14 @@ void create_PrometheusExporter() {
.MetricExporter()
.withPrometheus(new Prometheus()),
spiHelper,
new ArrayList<>()))
closeables))
.isInstanceOf(ConfigurationException.class)
.hasMessage("prometheus exporter not supported in this context");
cleanup.addCloseables(closeables);
}

@Test
void create_SpiExporter_Unknown() {
List<Closeable> closeables = new ArrayList<>();

assertThatThrownBy(
() ->
MetricExporterFactory.getInstance()
Expand All @@ -233,7 +231,6 @@ void create_SpiExporter_Unknown() {
.isInstanceOf(ConfigurationException.class)
.hasMessage(
"No component provider detected for io.opentelemetry.sdk.metrics.export.MetricExporter with name \"unknown_key\".");
cleanup.addCloseables(closeables);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.internal.testing.CleanupExtension;
import io.opentelemetry.sdk.autoconfigure.internal.SpiHelper;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigurationException;
import io.opentelemetry.sdk.extension.incubator.fileconfig.component.SpanProcessorComponentProvider;
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.BatchSpanProcessor;
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.Otlp;
import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.SimpleSpanProcessor;
Expand All @@ -23,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand Down Expand Up @@ -137,19 +139,34 @@ void create_SimpleConfigured() {
}

@Test
void create_SpiProcessor() {
List<Closeable> closeables = new ArrayList<>();

void create_SpiProcessor_Unknown() {
assertThatThrownBy(
() ->
SpanProcessorFactory.getInstance()
.create(
new SpanProcessor()
.withAdditionalProperty("test", ImmutableMap.of("key1", "value1")),
.withAdditionalProperty(
"unknown_key", ImmutableMap.of("key1", "value1")),
spiHelper,
closeables))
new ArrayList<>()))
.isInstanceOf(ConfigurationException.class)
.hasMessage("Unrecognized span processor(s): [test]");
cleanup.addCloseables(closeables);
.hasMessage(
"No component provider detected for io.opentelemetry.sdk.trace.SpanProcessor with name \"unknown_key\".");
}

@Test
void create_SpiExporter_Valid() {
io.opentelemetry.sdk.trace.SpanProcessor spanProcessor =
SpanProcessorFactory.getInstance()
.create(
new SpanProcessor()
.withAdditionalProperty("test", ImmutableMap.of("key1", "value1")),
spiHelper,
new ArrayList<>());
assertThat(spanProcessor).isInstanceOf(SpanProcessorComponentProvider.TestSpanProcessor.class);
Assertions.assertThat(
((SpanProcessorComponentProvider.TestSpanProcessor) spanProcessor)
.config.getString("key1"))
.isEqualTo("value1");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.extension.incubator.fileconfig.component;

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider;
import io.opentelemetry.sdk.autoconfigure.spi.internal.StructuredConfigProperties;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;

public class LogRecordProcessorComponentProvider implements ComponentProvider<LogRecordProcessor> {
@Override
public Class<LogRecordProcessor> getType() {
return LogRecordProcessor.class;
}

@Override
public String getName() {
return "test";
}

@Override
public LogRecordProcessor create(StructuredConfigProperties config) {
return new TestLogRecordProcessor(config);
}

public static class TestLogRecordProcessor implements LogRecordProcessor {

public final StructuredConfigProperties config;

private TestLogRecordProcessor(StructuredConfigProperties config) {
this.config = config;
}

@Override
public void onEmit(Context context, ReadWriteLogRecord logRecord) {}

@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.extension.incubator.fileconfig.component;

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider;
import io.opentelemetry.sdk.autoconfigure.spi.internal.StructuredConfigProperties;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;

public class SpanProcessorComponentProvider implements ComponentProvider<SpanProcessor> {
@Override
public Class<SpanProcessor> getType() {
return SpanProcessor.class;
}

@Override
public String getName() {
return "test";
}

@Override
public SpanProcessor create(StructuredConfigProperties config) {
return new TestSpanProcessor(config);
}

public static class TestSpanProcessor implements SpanProcessor {

public final StructuredConfigProperties config;

private TestSpanProcessor(StructuredConfigProperties config) {
this.config = config;
}

@Override
public void onStart(Context parentContext, ReadWriteSpan span) {}

@Override
public boolean isStartRequired() {
return true;
}

@Override
public void onEnd(ReadableSpan span) {}

@Override
public boolean isEndRequired() {
return true;
}

@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
io.opentelemetry.sdk.extension.incubator.fileconfig.component.MetricExporterComponentProvider
io.opentelemetry.sdk.extension.incubator.fileconfig.component.SpanExporterComponentProvider
io.opentelemetry.sdk.extension.incubator.fileconfig.component.LogRecordExporterComponentProvider
io.opentelemetry.sdk.extension.incubator.fileconfig.component.SpanProcessorComponentProvider
io.opentelemetry.sdk.extension.incubator.fileconfig.component.LogRecordProcessorComponentProvider
io.opentelemetry.sdk.extension.incubator.fileconfig.component.ResourceComponentProvider
io.opentelemetry.sdk.extension.incubator.fileconfig.component.ResourceOrderedFirstComponentProvider
io.opentelemetry.sdk.extension.incubator.fileconfig.component.ResourceOrderedSecondComponentProvider

0 comments on commit 8495996

Please sign in to comment.