Skip to content

Commit

Permalink
otel trace source and otel metrics source test coverage (opensearch-p…
Browse files Browse the repository at this point in the history
…roject#5242)

Added security/authentication tests for OtelTraceSource and OTelMetricsSource authentication

Signed-off-by: Jeremy Michael <jsusanto@amazon.com>
  • Loading branch information
jmsusanto authored and sbose2k21 committed Dec 13, 2024
1 parent 9ee47a3 commit 8339853
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linecorp.armeria.client.Clients;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpMethod;
Expand All @@ -35,6 +36,7 @@
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;

import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
Expand Down Expand Up @@ -96,13 +98,15 @@
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.GZIPOutputStream;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -347,6 +351,7 @@ void testHttpFullJsonWithCustomPathAndUnframedRequests() throws InvalidProtocolB
.join();
}


@Test
void testHttpFullJsonWithCustomPathAndAuthHeader_with_successful_response() throws InvalidProtocolBufferException {
when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME);
Expand Down Expand Up @@ -411,6 +416,116 @@ void testHttpFullJsonWithCustomPathAndAuthHeader_with_unsuccessful_response() th
.join();
}

@Test
void testHttpRequestWithInvalidCredentials_with_unsuccessful_response() throws InvalidProtocolBufferException {
when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME);
when(httpBasicAuthenticationConfig.getPassword()).thenReturn(PASSWORD);
final GrpcAuthenticationProvider grpcAuthenticationProvider = new GrpcBasicAuthenticationProvider(httpBasicAuthenticationConfig);

when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class)))
.thenReturn(grpcAuthenticationProvider);
when(oTelMetricsSourceConfig.getAuthentication()).thenReturn(new PluginModel("http_basic",
Map.of(
"username", USERNAME,
"password", PASSWORD
)));
when(oTelMetricsSourceConfig.enableUnframedRequests()).thenReturn(true);
when(oTelMetricsSourceConfig.getPath()).thenReturn(TEST_PATH);

configureObjectUnderTest();
SOURCE.start(buffer);

final String invalidUsername = "wrong_user";
final String invalidPassword = "wrong_password";
final String invalidCredentials = Base64.getEncoder()
.encodeToString(String.format("%s:%s", invalidUsername, invalidPassword).getBytes(StandardCharsets.UTF_8));

final String transformedPath = "/" + TEST_PIPELINE_NAME + "/v1/metrics";

WebClient.of().prepare()
.post("http://127.0.0.1:21891" + transformedPath)
.content(MediaType.JSON_UTF_8, JsonFormat.printer().print(createExportMetricsRequest()).getBytes())
.header("Authorization", "Basic " + invalidCredentials)
.execute()
.aggregate()
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.UNAUTHORIZED, throwable))
.join();
}

@Test
void testGrpcRequestWithInvalidCredentials_with_unsuccessful_response() throws Exception {
when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME);
when(httpBasicAuthenticationConfig.getPassword()).thenReturn(PASSWORD);
final GrpcAuthenticationProvider grpcAuthenticationProvider = new GrpcBasicAuthenticationProvider(httpBasicAuthenticationConfig);

when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class)))
.thenReturn(grpcAuthenticationProvider);
when(oTelMetricsSourceConfig.getAuthentication()).thenReturn(new PluginModel("http_basic",
Map.of(
"username", USERNAME,
"password", PASSWORD
)));
configureObjectUnderTest();
SOURCE.start(buffer);

final String invalidUsername = "wrong_user";
final String invalidPassword = "wrong_password";
final String invalidCredentials = Base64.getEncoder()
.encodeToString(String.format("%s:%s", invalidUsername, invalidPassword).getBytes(StandardCharsets.UTF_8));

final MetricsServiceGrpc.MetricsServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT)
.addHeader("Authorization", "Basic " + invalidCredentials)
.build(MetricsServiceGrpc.MetricsServiceBlockingStub.class);

final StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportMetricsRequest()));

assertThat(actualException.getStatus(), notNullValue());
assertThat(actualException.getStatus().getCode(), equalTo(Status.Code.UNAUTHENTICATED));
}

@Test
void testHttpWithoutSslFailsWhenSslIsEnabled() throws InvalidProtocolBufferException {
when(oTelMetricsSourceConfig.isSsl()).thenReturn(true);
when(oTelMetricsSourceConfig.getSslKeyCertChainFile()).thenReturn("data/certificate/test_cert.crt");
when(oTelMetricsSourceConfig.getSslKeyFile()).thenReturn("data/certificate/test_decrypted_key.key");
configureObjectUnderTest();
SOURCE.start(buffer);

WebClient client = WebClient.builder("http://127.0.0.1:21891")
.build();

CompletionException exception = assertThrows(CompletionException.class, () -> client.execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority("127.0.0.1:21891")
.method(HttpMethod.POST)
.path("/opentelemetry.proto.collector.metrics.v1.MetricsService/Export")
.contentType(MediaType.JSON_UTF_8)
.build(),
HttpData.copyOf(JsonFormat.printer().print(createExportMetricsRequest()).getBytes()))
.aggregate()
.join());

assertThat(exception.getCause(), instanceOf(ClosedSessionException.class));
}

@Test
void testGrpcFailsIfSslIsEnabledAndNoTls() {
when(oTelMetricsSourceConfig.isSsl()).thenReturn(true);
when(oTelMetricsSourceConfig.getSslKeyCertChainFile()).thenReturn("data/certificate/test_cert.crt");
when(oTelMetricsSourceConfig.getSslKeyFile()).thenReturn("data/certificate/test_decrypted_key.key");
configureObjectUnderTest();
SOURCE.start(buffer);

MetricsServiceGrpc.MetricsServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT)
.build(MetricsServiceGrpc.MetricsServiceBlockingStub.class);

StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportMetricsRequest()));

assertThat(actualException.getStatus(), notNullValue());
assertThat(actualException.getStatus().getCode(), equalTo(Status.Code.UNKNOWN));
}


@Test
void testServerStartCertFileSuccess() throws IOException {
try (MockedStatic<Server> armeriaServerMock = Mockito.mockStatic(Server.class)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linecorp.armeria.client.Clients;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpMethod;
Expand Down Expand Up @@ -93,13 +94,15 @@
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.GZIPOutputStream;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -462,6 +465,108 @@ void testHttpFullJsonWithCustomPathAndAuthHeader_with_unsuccessful_response() th
.join();
}

@Test
void testHttpRequestWithInvalidCredentialsShouldReturnUnauthorized() throws InvalidProtocolBufferException {
when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME);
when(httpBasicAuthenticationConfig.getPassword()).thenReturn(PASSWORD);
final GrpcAuthenticationProvider grpcAuthenticationProvider = new GrpcBasicAuthenticationProvider(httpBasicAuthenticationConfig);

when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class)))
.thenReturn(grpcAuthenticationProvider);
when(oTelTraceSourceConfig.getAuthentication()).thenReturn(new PluginModel("http_basic",
Map.of(
"username", USERNAME,
"password", PASSWORD
)));
when(oTelTraceSourceConfig.enableUnframedRequests()).thenReturn(true);
when(oTelTraceSourceConfig.getPath()).thenReturn(TEST_PATH);
configureObjectUnderTest();
SOURCE.start(buffer);

final String transformedPath = "/" + TEST_PIPELINE_NAME + "/v1/traces";

final String invalidUsername = "wrong_user";
final String invalidPassword = "wrong_password";
final String invalidCredentials = Base64.getEncoder()
.encodeToString(String.format("%s:%s", invalidUsername, invalidPassword).getBytes(StandardCharsets.UTF_8));

WebClient.of().prepare()
.post("http://127.0.0.1:21890" + transformedPath)
.content(MediaType.JSON_UTF_8, JsonFormat.printer().print(createExportTraceRequest()).getBytes())
.header("Authorization", "Basic " + invalidCredentials)
.execute()
.aggregate()
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.UNAUTHORIZED, throwable))
.join();
}

@Test
void testGrpcRequestWithoutAuthentication_with_unsuccessful_response() throws Exception {
when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME);
when(httpBasicAuthenticationConfig.getPassword()).thenReturn(PASSWORD);
final GrpcAuthenticationProvider grpcAuthenticationProvider = new GrpcBasicAuthenticationProvider(httpBasicAuthenticationConfig);

when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class)))
.thenReturn(grpcAuthenticationProvider);
when(oTelTraceSourceConfig.getAuthentication()).thenReturn(new PluginModel("http_basic",
Map.of(
"username", USERNAME,
"password", PASSWORD
)));
configureObjectUnderTest();
SOURCE.start(buffer);

final TraceServiceGrpc.TraceServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT)
.build(TraceServiceGrpc.TraceServiceBlockingStub.class);

final StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportTraceRequest()));

assertThat(actualException.getStatus(), notNullValue());
assertThat(actualException.getStatus().getCode(), equalTo(Status.Code.UNAUTHENTICATED));
}

@Test
void testHttpWithoutSslFailsWhenSslIsEnabled() throws InvalidProtocolBufferException {
when(oTelTraceSourceConfig.isSsl()).thenReturn(true);
when(oTelTraceSourceConfig.getSslKeyCertChainFile()).thenReturn("data/certificate/test_cert.crt");
when(oTelTraceSourceConfig.getSslKeyFile()).thenReturn("data/certificate/test_decrypted_key.key");
configureObjectUnderTest();
SOURCE.start(buffer);

WebClient client = WebClient.builder("http://127.0.0.1:21890")
.build();

CompletionException exception = assertThrows(CompletionException.class, () -> client.execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority("127.0.0.1:21890")
.method(HttpMethod.POST)
.path("/opentelemetry.proto.collector.trace.v1.TraceService/Export")
.contentType(MediaType.JSON_UTF_8)
.build(),
HttpData.copyOf(JsonFormat.printer().print(createExportTraceRequest()).getBytes()))
.aggregate()
.join());

assertThat(exception.getCause(), instanceOf(ClosedSessionException.class));
}

@Test
void testGrpcFailsIfSslIsEnabledAndNoTls() {
when(oTelTraceSourceConfig.isSsl()).thenReturn(true);
when(oTelTraceSourceConfig.getSslKeyCertChainFile()).thenReturn("data/certificate/test_cert.crt");
when(oTelTraceSourceConfig.getSslKeyFile()).thenReturn("data/certificate/test_decrypted_key.key");
configureObjectUnderTest();
SOURCE.start(buffer);

TraceServiceGrpc.TraceServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT)
.build(TraceServiceGrpc.TraceServiceBlockingStub.class);

StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportTraceRequest()));

assertThat(actualException.getStatus(), notNullValue());
assertThat(actualException.getStatus().getCode(), equalTo(Status.Code.UNKNOWN));
}

@Test
void testServerStartCertFileSuccess() throws IOException {
try (MockedStatic<Server> armeriaServerMock = Mockito.mockStatic(Server.class)) {
Expand Down

0 comments on commit 8339853

Please sign in to comment.