diff --git a/aoe-data-analytics/.env b/aoe-data-analytics/.env
index e8664634c..3c73a6da1 100644
--- a/aoe-data-analytics/.env
+++ b/aoe-data-analytics/.env
@@ -12,7 +12,8 @@ mongodb.primary.database=aoe
mongodb.primary.username=aoeuser
mongodb.primary.password=aoepassword
-mongodb.primary.enable.ssl=false
+aoe.mongodb.primary.enable.ssl=false
-kafka.enabled=false
-spring.kafka.consumer.auto.startup=false
+aoe.kafka.enabled=true
+aoe.kafka.sasl.enable=false
+aoe.kafka.consumer.auto.startup=true
\ No newline at end of file
diff --git a/aoe-data-analytics/service-etl-processor/pom.xml b/aoe-data-analytics/service-etl-processor/pom.xml
index 7f8c56371..c3eb9edb9 100644
--- a/aoe-data-analytics/service-etl-processor/pom.xml
+++ b/aoe-data-analytics/service-etl-processor/pom.xml
@@ -53,6 +53,11 @@
kafka-streams
3.3.1
+
+ software.amazon.msk
+ aws-msk-iam-auth
+ 2.2.0
+
org.springframework.kafka
spring-kafka
diff --git a/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/configuration/KafkaConsumerConfiguration.java b/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/configuration/KafkaConsumerConfiguration.java
index 52f69c8b9..48994c8d7 100644
--- a/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/configuration/KafkaConsumerConfiguration.java
+++ b/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/configuration/KafkaConsumerConfiguration.java
@@ -2,6 +2,7 @@
import fi.csc.processor.model.request.MaterialActivity;
import fi.csc.processor.model.request.SearchRequest;
+import fi.csc.processor.utils.KafkaConfigUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -20,7 +21,7 @@
import java.util.HashMap;
import java.util.Map;
-@ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)
+@ConditionalOnProperty(value = "aoe.kafka.enabled", matchIfMissing = true)
@EnableKafka
@Configuration
public class KafkaConsumerConfiguration {
@@ -34,6 +35,9 @@ public class KafkaConsumerConfiguration {
@Value(value = "${kafka.group-id.prod-search-requests}")
private String groupSearchRequestsPrimary;
+ @Value(value = "${aoe.kafka.sasl.enable}")
+ private boolean saslEnabled;
+
@Bean
public ConsumerFactory consumerFactoryMaterialActivityPrimary() {
Map config = new HashMap<>();
@@ -42,9 +46,16 @@ public ConsumerFactory consumerFactoryMaterialActivity
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+
+ if (saslEnabled) {
+ config.putAll(KafkaConfigUtil.saslConfig());
+ }
+
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(MaterialActivity.class));
}
+
+
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerMaterialActivityPrimary() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
@@ -60,6 +71,11 @@ public ConsumerFactory consumerFactorySearchRequestsPrima
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+
+ if (saslEnabled) {
+ config.putAll(KafkaConfigUtil.saslConfig());
+ }
+
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(SearchRequest.class));
}
diff --git a/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/configuration/KafkaProducerConfiguration.java b/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/configuration/KafkaProducerConfiguration.java
index 15972d87a..052967885 100644
--- a/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/configuration/KafkaProducerConfiguration.java
+++ b/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/configuration/KafkaProducerConfiguration.java
@@ -2,6 +2,7 @@
import fi.csc.processor.model.request.MaterialActivity;
import fi.csc.processor.model.request.SearchRequest;
+import fi.csc.processor.utils.KafkaConfigUtil;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -20,7 +21,7 @@
import java.util.HashMap;
import java.util.Map;
-@ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)
+@ConditionalOnProperty(value = "aoe.kafka.enabled", matchIfMissing = true)
@Configuration
public class KafkaProducerConfiguration {
@@ -33,12 +34,20 @@ public class KafkaProducerConfiguration {
@Value(value = "${kafka.topic.prod-search-requests}")
private String topicSearchRequestsPrimary;
+ @Value(value = "${aoe.kafka.sasl.enable}")
+ private boolean saslEnabled;
+
@Bean
public ProducerFactory producerFactoryMaterialActivity() {
Map config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+
+ if (saslEnabled) {
+ config.putAll(KafkaConfigUtil.saslConfig());
+ }
+
return new DefaultKafkaProducerFactory<>(config);
}
@@ -53,6 +62,11 @@ public ProducerFactory producerFactorySearchRequest() {
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+
+ if (saslEnabled) {
+ config.putAll(KafkaConfigUtil.saslConfig());
+ }
+
return new DefaultKafkaProducerFactory<>(config);
}
@@ -65,6 +79,15 @@ public KafkaTemplate kafkaTemplateSearchRequest() {
public KafkaAdmin kafkaAdmin() {
Map configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+ if (saslEnabled) {
+ configs.putAll(KafkaConfigUtil.saslConfig());
+ }
+
+ return createKafkaAdmin(configs);
+ }
+
+ private KafkaAdmin createKafkaAdmin(Map configs) {
return new KafkaAdmin(configs);
}
diff --git a/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/configuration/MongoPrimaryConfiguration.java b/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/configuration/MongoPrimaryConfiguration.java
index ebc0fab18..bea9ddf2c 100644
--- a/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/configuration/MongoPrimaryConfiguration.java
+++ b/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/configuration/MongoPrimaryConfiguration.java
@@ -7,6 +7,7 @@
import com.mongodb.client.MongoClients;
import fi.csc.processor.converter.TimeFormatConverter;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.mongo.MongoProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -34,6 +35,9 @@
mongoTemplateRef = "primaryMongoTemplate")
public class MongoPrimaryConfiguration {
+ @Value("${aoe.mongodb.primary.enable.ssl}")
+ private boolean enableSsl;
+
@Primary
@Bean(name = "primaryProperties")
@ConfigurationProperties("mongodb.primary")
@@ -43,7 +47,6 @@ public MongoProperties primaryProperties() {
@Bean(name = "primaryMongoClient")
public MongoClient mongoClient(@Qualifier("primaryProperties") MongoProperties mongoProperties) {
- boolean enableSsl = Boolean.parseBoolean(System.getenv().getOrDefault("mongodb.primary.enable.ssl", "true"));
MongoClientSettings.Builder builder = MongoClientSettings.builder();
diff --git a/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/consumer/KafkaConsumer.java b/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/consumer/KafkaConsumer.java
index 7880b5596..dd2d5d3ed 100644
--- a/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/consumer/KafkaConsumer.java
+++ b/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/consumer/KafkaConsumer.java
@@ -22,7 +22,7 @@
import java.time.format.DateTimeFormatter;
@Service
-@ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)
+@ConditionalOnProperty(value = "aoe.kafka.enabled", matchIfMissing = true)
public class KafkaConsumer implements ConsumerSeekAware {
private final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class.getSimpleName());
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
@@ -41,7 +41,7 @@ public KafkaConsumer(MaterialActivityPrimaryRepository materialActivityPrimaryRe
topics = "${kafka.topic.prod-material-activity}",
groupId = "${kafka.group-id.prod-material-activity}",
containerFactory = "kafkaListenerMaterialActivityPrimary",
- autoStartup = "${spring.kafka.consumer.auto.startup}",
+ autoStartup = "${aoe.kafka.consumer.auto.startup}",
properties = {"enable.auto.commit:false", "auto.offset.reset:latest"})
public void consumeMaterialActivityPrimary(
@Payload MaterialActivity materialActivity, // byte[] payload
@@ -60,7 +60,7 @@ public void consumeMaterialActivityPrimary(
topics = "${kafka.topic.prod-search-requests}",
groupId = "${kafka.group-id.prod-search-requests}",
containerFactory = "kafkaListenerSearchRequestsPrimary",
- autoStartup = "${spring.kafka.consumer.auto.startup}",
+ autoStartup = "${aoe.kafka.consumer.auto.startup}",
properties = {"enable.auto.commit:false", "auto.offset.reset:latest"})
public void consumeSearchRequestsPrimary(
@Payload SearchRequest searchRequest, // byte[] payload
diff --git a/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/utils/KafkaConfigUtil.java b/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/utils/KafkaConfigUtil.java
new file mode 100644
index 000000000..331a9a743
--- /dev/null
+++ b/aoe-data-analytics/service-etl-processor/src/main/java/fi/csc/processor/utils/KafkaConfigUtil.java
@@ -0,0 +1,23 @@
+package fi.csc.processor.utils;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Map;
+
+public class KafkaConfigUtil {
+
+ private KafkaConfigUtil() {
+ // no instance creation allowed
+ }
+
+ public static Map saslConfig() {
+ return Map.of(
+ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/certs/rds-truststore.jks",
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL",
+ SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM",
+ SaslConfigs.SASL_JAAS_CONFIG, "software.amazon.msk.auth.iam.IAMLoginModule required;",
+ SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
+ }
+}
diff --git a/aoe-data-analytics/service-etl-processor/src/main/resources/application.properties b/aoe-data-analytics/service-etl-processor/src/main/resources/application.properties
index d1e8dd9fc..d7bdce45d 100644
--- a/aoe-data-analytics/service-etl-processor/src/main/resources/application.properties
+++ b/aoe-data-analytics/service-etl-processor/src/main/resources/application.properties
@@ -10,7 +10,8 @@ server.servlet.context-path=/api
spring.profiles.active=dev
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration
-## MongoDB Properties (prod)
+## MongoDB Properties
+aoe.mongodb.primary.enable.ssl=false
mongodb.primary.host=localhost
mongodb.primary.port=27017
mongodb.primary.database=
@@ -18,11 +19,12 @@ mongodb.primary.username=
mongodb.primary.password=
## Kafka Cluster Properties
-kafka.enabled=true
+aoe.kafka.enabled=true
+aoe.kafka.sasl.enable=false
spring.kafka.consumer.bootstrap-servers=localhost:19092,localhost:19092,localhost:19092
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
-spring.kafka.consumer.auto.startup=true
+aoe.kafka.consumer.auto.startup=true
# auto-commit-interval: 5000
spring.kafka.producer.batch-size=10