Skip to content

Commit 20753ed

Browse files
authored
AWS CloudWatch Event Sink Implementation (#1965)
1 parent c3f5001 commit 20753ed

File tree

9 files changed

+781
-40
lines changed

9 files changed

+781
-40
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ jakarta-ws-rs-api = { module = "jakarta.ws.rs:jakarta.ws.rs-api", version = "4.0
7676
jandex = { module = "io.smallrye.jandex:jandex", version ="3.4.0" }
7777
javax-servlet-api = { module = "javax.servlet:javax.servlet-api", version = "4.0.1" }
7878
junit-bom = { module = "org.junit:junit-bom", version = "5.13.4" }
79+
localstack = { module = "org.testcontainers:localstack", version = "1.19.7" }
7980
keycloak-admin-client = { module = "org.keycloak:keycloak-admin-client", version = "26.0.6" }
8081
jcstress-core = { module = "org.openjdk.jcstress:jcstress-core", version = "0.16" }
8182
jmh-core = { module = "org.openjdk.jmh:jmh-core", version.ref = "jmh" }

runtime/service/build.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ dependencies {
8787
implementation("software.amazon.awssdk:sts")
8888
implementation("software.amazon.awssdk:iam-policy-builder")
8989
implementation("software.amazon.awssdk:s3")
90+
implementation("software.amazon.awssdk:cloudwatchlogs")
9091
implementation("software.amazon.awssdk:apache-client") {
9192
exclude("commons-logging", "commons-logging")
9293
}
@@ -128,6 +129,9 @@ dependencies {
128129
testImplementation("io.quarkus:quarkus-rest-client")
129130
testImplementation("io.quarkus:quarkus-rest-client-jackson")
130131
testImplementation("io.rest-assured:rest-assured")
132+
testImplementation(libs.localstack)
133+
testImplementation("org.testcontainers:testcontainers")
134+
testImplementation(project(":polaris-container-spec-helper"))
131135

132136
testImplementation(libs.threeten.extra)
133137
testImplementation(libs.hawkular.agent.prometheus.scraper)
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.service.events.jsonEventListener;
21+
22+
import java.util.HashMap;
23+
import org.apache.polaris.service.events.AfterTableRefreshedEvent;
24+
import org.apache.polaris.service.events.PolarisEventListener;
25+
26+
/**
27+
* This class provides a common framework for transforming Polaris events into a HashMap, which can
28+
* be used to transform the event further, such as transforming into a JSON string, and send them to
29+
* various destinations. Concrete implementations should override the
30+
* {{@code @link#transformAndSendEvent(HashMap)}} method to define how the event data should be
31+
* transformed into a JSON string, transmitted, and/or stored.
32+
*/
33+
public abstract class PropertyMapEventListener extends PolarisEventListener {
34+
protected abstract void transformAndSendEvent(HashMap<String, Object> properties);
35+
36+
@Override
37+
public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {
38+
HashMap<String, Object> properties = new HashMap<>();
39+
properties.put("event_type", event.getClass().getSimpleName());
40+
properties.put("table_identifier", event.tableIdentifier().toString());
41+
transformAndSendEvent(properties);
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch;
21+
22+
/** Configuration interface for AWS CloudWatch event listener settings. */
23+
public interface AwsCloudWatchConfiguration {
24+
String awsCloudWatchLogGroup();
25+
26+
String awsCloudWatchLogStream();
27+
28+
String awsCloudWatchRegion();
29+
30+
boolean synchronousMode();
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch;
21+
22+
import com.fasterxml.jackson.core.JsonProcessingException;
23+
import com.fasterxml.jackson.databind.ObjectMapper;
24+
import io.smallrye.common.annotation.Identifier;
25+
import jakarta.annotation.PostConstruct;
26+
import jakarta.annotation.PreDestroy;
27+
import jakarta.enterprise.context.ApplicationScoped;
28+
import jakarta.inject.Inject;
29+
import jakarta.ws.rs.core.Context;
30+
import jakarta.ws.rs.core.SecurityContext;
31+
import java.time.Clock;
32+
import java.util.HashMap;
33+
import java.util.List;
34+
import java.util.concurrent.CompletableFuture;
35+
import java.util.function.Supplier;
36+
import org.apache.polaris.core.auth.PolarisPrincipal;
37+
import org.apache.polaris.core.context.CallContext;
38+
import org.apache.polaris.service.config.PolarisIcebergObjectMapperCustomizer;
39+
import org.apache.polaris.service.events.jsonEventListener.PropertyMapEventListener;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
import software.amazon.awssdk.regions.Region;
43+
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsAsyncClient;
44+
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
45+
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
46+
import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogGroupsRequest;
47+
import software.amazon.awssdk.services.cloudwatchlogs.model.DescribeLogStreamsRequest;
48+
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
49+
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
50+
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
51+
52+
@ApplicationScoped
53+
@Identifier("aws-cloudwatch")
54+
public class AwsCloudWatchEventListener extends PropertyMapEventListener {
55+
private static final Logger LOGGER = LoggerFactory.getLogger(AwsCloudWatchEventListener.class);
56+
final ObjectMapper objectMapper = new ObjectMapper();
57+
58+
private CloudWatchLogsAsyncClient client;
59+
60+
private final String logGroup;
61+
private final String logStream;
62+
private final Region region;
63+
private final boolean synchronousMode;
64+
private final Clock clock;
65+
66+
@Inject CallContext callContext;
67+
68+
@Context SecurityContext securityContext;
69+
70+
@Inject
71+
public AwsCloudWatchEventListener(
72+
AwsCloudWatchConfiguration config,
73+
Clock clock,
74+
PolarisIcebergObjectMapperCustomizer customizer) {
75+
this.logStream = config.awsCloudWatchLogStream();
76+
this.logGroup = config.awsCloudWatchLogGroup();
77+
this.region = Region.of(config.awsCloudWatchRegion());
78+
this.synchronousMode = config.synchronousMode();
79+
this.clock = clock;
80+
customizer.customize(this.objectMapper);
81+
}
82+
83+
@PostConstruct
84+
void start() {
85+
this.client = createCloudWatchAsyncClient();
86+
ensureLogGroupAndStream();
87+
}
88+
89+
protected CloudWatchLogsAsyncClient createCloudWatchAsyncClient() {
90+
return CloudWatchLogsAsyncClient.builder().region(region).build();
91+
}
92+
93+
private void ensureLogGroupAndStream() {
94+
ensureResourceExists(
95+
() ->
96+
client
97+
.describeLogGroups(
98+
DescribeLogGroupsRequest.builder().logGroupNamePrefix(logGroup).build())
99+
.join()
100+
.logGroups()
101+
.stream()
102+
.anyMatch(g -> g.logGroupName().equals(logGroup)),
103+
() ->
104+
client
105+
.createLogGroup(CreateLogGroupRequest.builder().logGroupName(logGroup).build())
106+
.join(),
107+
"group",
108+
logGroup);
109+
ensureResourceExists(
110+
() ->
111+
client
112+
.describeLogStreams(
113+
DescribeLogStreamsRequest.builder()
114+
.logGroupName(logGroup)
115+
.logStreamNamePrefix(logStream)
116+
.build())
117+
.join()
118+
.logStreams()
119+
.stream()
120+
.anyMatch(s -> s.logStreamName().equals(logStream)),
121+
() ->
122+
client
123+
.createLogStream(
124+
CreateLogStreamRequest.builder()
125+
.logGroupName(logGroup)
126+
.logStreamName(logStream)
127+
.build())
128+
.join(),
129+
"stream",
130+
logStream);
131+
}
132+
133+
private static void ensureResourceExists(
134+
Supplier<Boolean> existsCheck,
135+
Runnable createAction,
136+
String resourceType,
137+
String resourceName) {
138+
if (existsCheck.get()) {
139+
LOGGER.debug("Log {} [{}] already exists", resourceType, resourceName);
140+
} else {
141+
LOGGER.debug("Attempting to create log {}: {}", resourceType, resourceName);
142+
createAction.run();
143+
}
144+
}
145+
146+
@PreDestroy
147+
void shutdown() {
148+
if (client != null) {
149+
client.close();
150+
client = null;
151+
}
152+
}
153+
154+
@Override
155+
protected void transformAndSendEvent(HashMap<String, Object> properties) {
156+
properties.put("realm_id", callContext.getRealmContext().getRealmIdentifier());
157+
properties.put("principal", securityContext.getUserPrincipal().getName());
158+
properties.put(
159+
"activated_roles", ((PolarisPrincipal) securityContext.getUserPrincipal()).getRoles());
160+
// TODO: Add request ID when it is available
161+
String eventAsJson;
162+
try {
163+
eventAsJson = objectMapper.writeValueAsString(properties);
164+
} catch (JsonProcessingException e) {
165+
LOGGER.error("Error processing event into JSON string: ", e);
166+
LOGGER.debug("Failed to convert the following object into JSON string: {}", properties);
167+
return;
168+
}
169+
InputLogEvent inputLogEvent =
170+
InputLogEvent.builder().message(eventAsJson).timestamp(clock.millis()).build();
171+
PutLogEventsRequest.Builder requestBuilder =
172+
PutLogEventsRequest.builder()
173+
.logGroupName(logGroup)
174+
.logStreamName(logStream)
175+
.logEvents(List.of(inputLogEvent));
176+
CompletableFuture<PutLogEventsResponse> future =
177+
client
178+
.putLogEvents(requestBuilder.build())
179+
.whenComplete(
180+
(resp, err) -> {
181+
if (err != null) {
182+
LOGGER.error(
183+
"Error writing log to CloudWatch. Event: {}, Error: ", inputLogEvent, err);
184+
}
185+
});
186+
if (synchronousMode) {
187+
future.join();
188+
}
189+
}
190+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.service.quarkus.events.jsonEventListener.aws.cloudwatch;
20+
21+
import io.quarkus.runtime.annotations.StaticInitSafe;
22+
import io.smallrye.config.ConfigMapping;
23+
import io.smallrye.config.WithDefault;
24+
import io.smallrye.config.WithName;
25+
import jakarta.enterprise.context.ApplicationScoped;
26+
import org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch.AwsCloudWatchConfiguration;
27+
28+
/**
29+
* Quarkus-specific configuration interface for AWS CloudWatch event listener integration.
30+
*
31+
* <p>This interface extends the base {@link AwsCloudWatchConfiguration} and provides
32+
* Quarkus-specific configuration mappings for AWS CloudWatch logging functionality.
33+
*/
34+
@StaticInitSafe
35+
@ConfigMapping(prefix = "polaris.event-listener.aws-cloudwatch")
36+
@ApplicationScoped
37+
public interface QuarkusAwsCloudWatchConfiguration extends AwsCloudWatchConfiguration {
38+
39+
/**
40+
* Returns the AWS CloudWatch log group name for event logging.
41+
*
42+
* <p>The log group is a collection of log streams that share the same retention, monitoring, and
43+
* access control settings. If not specified, defaults to "polaris-cloudwatch-default-group".
44+
*
45+
* <p>Configuration property: {@code polaris.event-listener.aws-cloudwatch.log-group}
46+
*
47+
* @return a String containing the log group name, or the default value if not configured
48+
*/
49+
@WithName("log-group")
50+
@WithDefault("polaris-cloudwatch-default-group")
51+
@Override
52+
String awsCloudWatchLogGroup();
53+
54+
/**
55+
* Returns the AWS CloudWatch log stream name for event logging.
56+
*
57+
* <p>A log stream is a sequence of log events that share the same source. Each log stream belongs
58+
* to one log group. If not specified, defaults to "polaris-cloudwatch-default-stream".
59+
*
60+
* <p>Configuration property: {@code polaris.event-listener.aws-cloudwatch.log-stream}
61+
*
62+
* @return a String containing the log stream name, or the default value if not configured
63+
*/
64+
@WithName("log-stream")
65+
@WithDefault("polaris-cloudwatch-default-stream")
66+
@Override
67+
String awsCloudWatchLogStream();
68+
69+
/**
70+
* Returns the AWS region where CloudWatch logs should be sent.
71+
*
72+
* <p>This specifies the AWS region for the CloudWatch service endpoint. The region must be a
73+
* valid AWS region identifier. If not specified, defaults to "us-east-1".
74+
*
75+
* <p>Configuration property: {@code polaris.event-listener.aws-cloudwatch.region}
76+
*
77+
* @return a String containing the AWS region, or the default value if not configured
78+
*/
79+
@WithName("region")
80+
@WithDefault("us-east-1")
81+
@Override
82+
String awsCloudWatchRegion();
83+
84+
/**
85+
* Returns the synchronous mode setting for CloudWatch logging.
86+
*
87+
* <p>When set to "true", log events are sent to CloudWatch synchronously, which may impact
88+
* application performance but ensures immediate delivery. When set to "false" (default), log
89+
* events are sent asynchronously for better performance.
90+
*
91+
* <p>Configuration property: {@code polaris.event-listener.aws-cloudwatch.synchronous-mode}
92+
*
93+
* @return a boolean value indicating the synchronous mode setting
94+
*/
95+
@WithName("synchronous-mode")
96+
@WithDefault("false")
97+
@Override
98+
boolean synchronousMode();
99+
}

0 commit comments

Comments
 (0)