Skip to content

Commit

Permalink
Add aws-msk-iam-auth dependency for data-analytics due to MSK kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
MikkoKauhanen committed Dec 5, 2024
1 parent a92a7f3 commit 2e7bae5
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 12 deletions.
7 changes: 4 additions & 3 deletions aoe-data-analytics/.env
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ mongodb.primary.database=aoe
mongodb.primary.username=aoeuser
mongodb.primary.password=aoepassword

mongodb.primary.enable.ssl=false
aoe.mongodb.primary.enable.ssl=false

kafka.enabled=false
spring.kafka.consumer.auto.startup=false
aoe.kafka.enabled=true
aoe.kafka.sasl.enable=false
aoe.kafka.consumer.auto.startup=true
5 changes: 5 additions & 0 deletions aoe-data-analytics/service-etl-processor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
<artifactId>kafka-streams</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import fi.csc.processor.model.request.MaterialActivity;
import fi.csc.processor.model.request.SearchRequest;
import fi.csc.processor.utils.KafkaConfigUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand All @@ -20,7 +21,7 @@
import java.util.HashMap;
import java.util.Map;

@ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)
@ConditionalOnProperty(value = "aoe.kafka.enabled", matchIfMissing = true)
@EnableKafka
@Configuration
public class KafkaConsumerConfiguration {
Expand All @@ -34,6 +35,9 @@ public class KafkaConsumerConfiguration {
@Value(value = "${kafka.group-id.prod-search-requests}")
private String groupSearchRequestsPrimary;

@Value(value = "${aoe.kafka.sasl.enable}")
private boolean saslEnabled;

@Bean
public ConsumerFactory<String, MaterialActivity> consumerFactoryMaterialActivityPrimary() {
Map<String, Object> config = new HashMap<>();
Expand All @@ -42,9 +46,16 @@ public ConsumerFactory<String, MaterialActivity> consumerFactoryMaterialActivity
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);

if (saslEnabled) {
config.putAll(KafkaConfigUtil.saslConfig());
}

return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(MaterialActivity.class));
}



@Bean
public ConcurrentKafkaListenerContainerFactory<String, MaterialActivity> kafkaListenerMaterialActivityPrimary() {
ConcurrentKafkaListenerContainerFactory<String, MaterialActivity> factory = new ConcurrentKafkaListenerContainerFactory<>();
Expand All @@ -60,6 +71,11 @@ public ConsumerFactory<String, SearchRequest> consumerFactorySearchRequestsPrima
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);

if (saslEnabled) {
config.putAll(KafkaConfigUtil.saslConfig());
}

return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(SearchRequest.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import fi.csc.processor.model.request.MaterialActivity;
import fi.csc.processor.model.request.SearchRequest;
import fi.csc.processor.utils.KafkaConfigUtil;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -20,7 +21,7 @@
import java.util.HashMap;
import java.util.Map;

@ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)
@ConditionalOnProperty(value = "aoe.kafka.enabled", matchIfMissing = true)
@Configuration
public class KafkaProducerConfiguration {

Expand All @@ -33,12 +34,20 @@ public class KafkaProducerConfiguration {
@Value(value = "${kafka.topic.prod-search-requests}")
private String topicSearchRequestsPrimary;

@Value(value = "${aoe.kafka.sasl.enable}")
private boolean saslEnabled;

@Bean
public ProducerFactory<String, MaterialActivity> producerFactoryMaterialActivity() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

if (saslEnabled) {
config.putAll(KafkaConfigUtil.saslConfig());
}

return new DefaultKafkaProducerFactory<>(config);
}

Expand All @@ -53,6 +62,11 @@ public ProducerFactory<String, SearchRequest> producerFactorySearchRequest() {
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

if (saslEnabled) {
config.putAll(KafkaConfigUtil.saslConfig());
}

return new DefaultKafkaProducerFactory<>(config);
}

Expand All @@ -65,6 +79,15 @@ public KafkaTemplate<String, SearchRequest> kafkaTemplateSearchRequest() {
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

if (saslEnabled) {
configs.putAll(KafkaConfigUtil.saslConfig());
}

return createKafkaAdmin(configs);
}

private KafkaAdmin createKafkaAdmin(Map<String, Object> configs) {
return new KafkaAdmin(configs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.mongodb.client.MongoClients;
import fi.csc.processor.converter.TimeFormatConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.mongo.MongoProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
Expand Down Expand Up @@ -34,6 +35,9 @@
mongoTemplateRef = "primaryMongoTemplate")
public class MongoPrimaryConfiguration {

@Value("${aoe.mongodb.primary.enable.ssl}")
private boolean enableSsl;

@Primary
@Bean(name = "primaryProperties")
@ConfigurationProperties("mongodb.primary")
Expand All @@ -43,7 +47,6 @@ public MongoProperties primaryProperties() {

@Bean(name = "primaryMongoClient")
public MongoClient mongoClient(@Qualifier("primaryProperties") MongoProperties mongoProperties) {
boolean enableSsl = Boolean.parseBoolean(System.getenv().getOrDefault("mongodb.primary.enable.ssl", "true"));

MongoClientSettings.Builder builder = MongoClientSettings.builder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.time.format.DateTimeFormatter;

@Service
@ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)
@ConditionalOnProperty(value = "aoe.kafka.enabled", matchIfMissing = true)
public class KafkaConsumer implements ConsumerSeekAware {
private final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class.getSimpleName());
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
Expand All @@ -41,7 +41,7 @@ public KafkaConsumer(MaterialActivityPrimaryRepository materialActivityPrimaryRe
topics = "${kafka.topic.prod-material-activity}",
groupId = "${kafka.group-id.prod-material-activity}",
containerFactory = "kafkaListenerMaterialActivityPrimary",
autoStartup = "${spring.kafka.consumer.auto.startup}",
autoStartup = "${aoe.kafka.consumer.auto.startup}",
properties = {"enable.auto.commit:false", "auto.offset.reset:latest"})
public void consumeMaterialActivityPrimary(
@Payload MaterialActivity materialActivity, // byte[] payload
Expand All @@ -60,7 +60,7 @@ public void consumeMaterialActivityPrimary(
topics = "${kafka.topic.prod-search-requests}",
groupId = "${kafka.group-id.prod-search-requests}",
containerFactory = "kafkaListenerSearchRequestsPrimary",
autoStartup = "${spring.kafka.consumer.auto.startup}",
autoStartup = "${aoe.kafka.consumer.auto.startup}",
properties = {"enable.auto.commit:false", "auto.offset.reset:latest"})
public void consumeSearchRequestsPrimary(
@Payload SearchRequest searchRequest, // byte[] payload
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package fi.csc.processor.utils;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;

import java.util.Map;

public class KafkaConfigUtil {

private KafkaConfigUtil() {
// no instance creation allowed
}

public static Map<String, String> saslConfig() {
return Map.of(
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/certs/rds-truststore.jks",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL",
SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM",
SaslConfigs.SASL_JAAS_CONFIG, "software.amazon.msk.auth.iam.IAMLoginModule required;",
SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@ server.servlet.context-path=/api
spring.profiles.active=dev
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration

## MongoDB Properties (prod)
## MongoDB Properties
aoe.mongodb.primary.enable.ssl=false
mongodb.primary.host=localhost
mongodb.primary.port=27017
mongodb.primary.database=
mongodb.primary.username=
mongodb.primary.password=

## Kafka Cluster Properties
kafka.enabled=true
aoe.kafka.enabled=true
aoe.kafka.sasl.enable=false
spring.kafka.consumer.bootstrap-servers=localhost:19092,localhost:19092,localhost:19092
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto.startup=true
aoe.kafka.consumer.auto.startup=true

# auto-commit-interval: 5000
spring.kafka.producer.batch-size=10
Expand Down

0 comments on commit 2e7bae5

Please sign in to comment.