Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
2e07dde
Add AWS CloudWatch integration through Event Listener
adnanhemani Jun 27, 2025
06eaca4
cleanup
adnanhemani Jun 27, 2025
1515fbb
spotlessapply
adnanhemani Jun 27, 2025
854501b
Added unit test with LocalStack
adnanhemani Jul 12, 2025
c1c94b2
typo
adnanhemani Jul 12, 2025
ab3c5f9
spotlessapply
adnanhemani Jul 12, 2025
ab9ccbe
Merge remote-tracking branch 'origin/main' into ahemani/cloudwatch_ev…
adnanhemani Jul 12, 2025
a641136
recompile from main
adnanhemani Jul 12, 2025
5a355d1
first revision change, based on review from @eric-maynard
adnanhemani Jul 16, 2025
bab4439
merge from origin/main
adnanhemani Jul 16, 2025
04c310a
spotlessapply
adnanhemani Jul 16, 2025
d4b44ff
Merge branch 'main' into ahemani/cloudwatch_event_listener
adnanhemani Jul 16, 2025
4d0554a
injected securitycontext and callcontext
adnanhemani Jul 17, 2025
cc715ad
todo
adnanhemani Jul 17, 2025
518aaaa
modify test
adnanhemani Jul 17, 2025
8758255
first draft of revision
adnanhemani Jul 20, 2025
f3f62a0
resolve comments from @eric-maynard and @snazy
adnanhemani Jul 21, 2025
d21dabc
refactor into separate package
adnanhemani Jul 21, 2025
9054511
typo
adnanhemani Jul 21, 2025
828760a
revising comments from @eric-maynard
adnanhemani Jul 22, 2025
ae79600
Merge branch 'main' into ahemani/cloudwatch_event_listener
adnanhemani Jul 22, 2025
9d47684
spotlessapply
adnanhemani Jul 22, 2025
025de74
revision on review from @singhpk234
adnanhemani Aug 4, 2025
e4ec3f8
resolve conflicts
adnanhemani Aug 4, 2025
491ea3a
resolve conflicts, pt. 2
adnanhemani Aug 4, 2025
d453660
spotlessapply
adnanhemani Aug 4, 2025
f89b0ae
spotlessapply again
adnanhemani Aug 4, 2025
e5c02b7
address comments from @RussellSpitzer
adnanhemani Aug 6, 2025
4f8a15b
merge from main
adnanhemani Aug 6, 2025
1305321
prior to manual test
adnanhemani Aug 13, 2025
27f28f4
addressing comments from @snazy
adnanhemani Aug 13, 2025
6b42071
Merge remote-tracking branch 'origin/main' into ahemani/cloudwatch_ev…
adnanhemani Aug 13, 2025
ec2bee8
merge from main
adnanhemani Aug 13, 2025
69b7feb
spotlesscheck
adnanhemani Aug 13, 2025
e8b5e93
documentation updates
adnanhemani Aug 13, 2025
3030d6a
review comments from @RussellSpitzer
adnanhemani Aug 13, 2025
b9abab6
removed mocked tests, as per review from @RussellSpitzer
adnanhemani Aug 13, 2025
4c91c57
Address comments from @RussellSpitzer and merge from main
adnanhemani Aug 22, 2025
b0c6160
typo
adnanhemani Aug 22, 2025
44fad9a
spotlessapply
adnanhemani Aug 22, 2025
e8447a9
fix docstrings
adnanhemani Aug 25, 2025
688ec97
refactor
adnanhemani Aug 25, 2025
360cb91
use awaitility
adnanhemani Aug 25, 2025
7c09d9b
Add negative case testing
adnanhemani Aug 28, 2025
5e34884
spotlessapply
adnanhemani Aug 28, 2025
e2ed743
Revision based on comments from @eric-maynard and @singhpk234
adnanhemani Sep 1, 2025
a870aea
Addressing comments from @singhpk234
adnanhemani Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ jakarta-validation-api = { module = "jakarta.validation:jakarta.validation-api",
jakarta-ws-rs-api = { module = "jakarta.ws.rs:jakarta.ws.rs-api", version = "4.0.0" }
javax-servlet-api = { module = "javax.servlet:javax.servlet-api", version = "4.0.1" }
junit-bom = { module = "org.junit:junit-bom", version = "5.13.4" }
localstack = { module = "org.testcontainers:localstack", version = "1.19.7" }
keycloak-admin-client = { module = "org.keycloak:keycloak-admin-client", version = "26.0.6" }
logback-classic = { module = "ch.qos.logback:logback-classic", version = "1.5.18" }
micrometer-bom = { module = "io.micrometer:micrometer-bom", version = "1.15.3" }
Expand Down
4 changes: 4 additions & 0 deletions runtime/service/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ dependencies {
implementation("software.amazon.awssdk:sts")
implementation("software.amazon.awssdk:iam-policy-builder")
implementation("software.amazon.awssdk:s3")
implementation("software.amazon.awssdk:cloudwatchlogs")
implementation("software.amazon.awssdk:apache-client") {
exclude("commons-logging", "commons-logging")
}
Expand Down Expand Up @@ -144,6 +145,9 @@ dependencies {
testImplementation("io.quarkus:quarkus-rest-client")
testImplementation("io.quarkus:quarkus-rest-client-jackson")
testImplementation("io.rest-assured:rest-assured")
testImplementation(libs.localstack)
testImplementation("org.testcontainers:testcontainers")
testImplementation(project(":polaris-container-spec-helper"))

testImplementation(libs.threeten.extra)
testImplementation(libs.hawkular.agent.prometheus.scraper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
@ConfigMapping(prefix = "polaris.event-listener")
public interface PolarisEventListenerConfiguration {
/**
* The type of the event listener to use. Must be a registered {@link
* org.apache.polaris.service.events.PolarisEventListener} identifier.
* The type of the event listener to use. Must be a registered {@link PolarisEventListener}
* identifier.
*/
String type();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.service.events.jsonEventListener;

import java.util.HashMap;
import org.apache.polaris.service.events.AfterTableRefreshedEvent;
import org.apache.polaris.service.events.PolarisEventListener;

/**
* This class provides a common framework for transforming Polaris events into a HashMap, which can
* be used to transform the event further, such as transforming into a JSON string, and send them to
* various destinations. Concrete implementations should override the
* {{@code @link#transformAndSendEvent(HashMap)}} method to define how the event data should be
* transformed into a JSON string, transmitted, and/or stored.
*/
public abstract class PropertyMapEventListener extends PolarisEventListener {
protected abstract void transformAndSendEvent(HashMap<String, Object> properties);

@Override
public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {
HashMap<String, Object> properties = new HashMap<>();
properties.put("event_type", event.getClass().getSimpleName());
properties.put("table_identifier", event.tableIdentifier().toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional] wondering if we need catalog name too in this ? lets say i have a same namespace and table in the different catalogs in the realm ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this but we should put this as part of the instrumentation of the event itself. I will look into this for #2480.

transformAndSendEvent(properties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch;

/** Configuration interface for AWS CloudWatch event listener settings. */
public interface AwsCloudWatchConfiguration {
String awsCloudWatchLogGroup();

String awsCloudWatchLogStream();

String awsCloudWatchRegion();

boolean synchronousMode();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.smallrye.common.annotation.Identifier;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.SecurityContext;
import java.time.Clock;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.polaris.core.auth.PolarisPrincipal;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.service.config.PolarisIcebergObjectMapperCustomizer;
import org.apache.polaris.service.events.jsonEventListener.PropertyMapEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsAsyncClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;

@ApplicationScoped
@Identifier("aws-cloudwatch")
public class AwsCloudWatchEventListener extends PropertyMapEventListener {
private static final Logger LOGGER = LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
final ObjectMapper objectMapper = new ObjectMapper();

private CloudWatchLogsAsyncClient client;

private final String logGroup;
private final String logStream;
private final Region region;
private final boolean synchronousMode;
private final Clock clock;

@Inject CallContext callContext;

@Context SecurityContext securityContext;

@Inject
public AwsCloudWatchEventListener(
AwsCloudWatchConfiguration config,
Clock clock,
PolarisIcebergObjectMapperCustomizer customizer) {
this.logStream = config.awsCloudWatchLogStream();
this.logGroup = config.awsCloudWatchLogGroup();
this.region = Region.of(config.awsCloudWatchRegion());
this.synchronousMode = config.synchronousMode();
this.clock = clock;
customizer.customize(this.objectMapper);
}

@PostConstruct
void start() {
this.client = createCloudWatchAsyncClient();
ensureLogGroupAndStream();
}

protected CloudWatchLogsAsyncClient createCloudWatchAsyncClient() {
return CloudWatchLogsAsyncClient.builder().region(region).build();
}

private void ensureLogGroupAndStream() {
ensureResourceExists(
() ->
client
.describeLogGroups(
DescribeLogGroupsRequest.builder().logGroupNamePrefix(logGroup).build())
.join()
.logGroups()
.stream()
.anyMatch(g -> g.logGroupName().equals(logGroup)),
() ->
client
.createLogGroup(CreateLogGroupRequest.builder().logGroupName(logGroup).build())
.join(),
"group",
logGroup);
ensureResourceExists(
() ->
client
.describeLogStreams(
DescribeLogStreamsRequest.builder()
.logGroupName(logGroup)
.logStreamNamePrefix(logStream)
.build())
.join()
.logStreams()
.stream()
.anyMatch(s -> s.logStreamName().equals(logStream)),
() ->
client
.createLogStream(
CreateLogStreamRequest.builder()
.logGroupName(logGroup)
.logStreamName(logStream)
.build())
.join(),
"stream",
logStream);
}

private static void ensureResourceExists(
Supplier<Boolean> existsCheck,
Runnable createAction,
String resourceType,
String resourceName) {
if (existsCheck.get()) {
LOGGER.debug("Log {} [{}] already exists", resourceType, resourceName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOGGER.debug("Log {} [{}] already exists", resourceType, resourceName);
LOGGER.debug("Resource {} [{}] already exists", resourceType, resourceName);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No - the resourceType here will be "stream" or "group" to make a final log line stating "Log stream [xyz] already exists"

} else {
LOGGER.debug("Attempting to create log {}: {}", resourceType, resourceName);
createAction.run();
}
}

@PreDestroy
void shutdown() {
if (client != null) {
client.close();
client = null;
}
}

@Override
protected void transformAndSendEvent(HashMap<String, Object> properties) {
properties.put("realm_id", callContext.getRealmContext().getRealmIdentifier());
properties.put("principal", securityContext.getUserPrincipal().getName());
properties.put(
"activated_roles", ((PolarisPrincipal) securityContext.getUserPrincipal()).getRoles());
// TODO: Add request ID when it is available
String eventAsJson;
try {
eventAsJson = objectMapper.writeValueAsString(properties);
} catch (JsonProcessingException e) {
LOGGER.error("Error processing event into JSON string: ", e);
LOGGER.debug("Failed to convert the following object into JSON string: {}", properties);
return;
}
InputLogEvent inputLogEvent =
InputLogEvent.builder().message(eventAsJson).timestamp(clock.millis()).build();
PutLogEventsRequest.Builder requestBuilder =
PutLogEventsRequest.builder()
.logGroupName(logGroup)
.logStreamName(logStream)
.logEvents(List.of(inputLogEvent));
CompletableFuture<PutLogEventsResponse> future =
client
.putLogEvents(requestBuilder.build())
.whenComplete(
(resp, err) -> {
if (err != null) {
LOGGER.error(
"Error writing log to CloudWatch. Event: {}, Error: ", inputLogEvent, err);
}
});
if (synchronousMode) {
future.join();
}
}
}
Loading