From f3d9fdb82baae1ad88a716b1f4fe999a8b5f6ea6 Mon Sep 17 00:00:00 2001
From: hu-ahmed <47451731+hu-ahmed@users.noreply.github.com>
Date: Tue, 17 Sep 2024 11:00:23 +0200
Subject: [PATCH] #2019 configure pekko-persistence with AWS IAM based auth
* provide own "driver" for the pekko-persistence plugin
* configure AWS IAM based authentication based on the Ditto configuration
* moved common AWS MongoCredential obtaining logic to new AwsAuthenticationHelper class
---
bom/pom.xml | 4 +-
.../main/resources/ditto-pekko-config.conf | 2 +-
internal/utils/extension/pom.xml | 2 +-
internal/utils/persistence/pom.xml | 76 ++++++++++++++---
.../persistence/mongo/MongoClientWrapper.java | 53 +-----------
.../mongo/auth/AwsAuthenticationHelper.java | 85 +++++++++++++++++++
...zableScalaDriverPersistenceExtension.scala | 65 ++++++++++++++
.../pekko/CustomizableScalaMongoDriver.scala | 39 +++++++++
policies/service/src/test/resources/test.conf | 2 +-
things/service/src/test/resources/test.conf | 2 +-
10 files changed, 261 insertions(+), 69 deletions(-)
create mode 100644 internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/auth/AwsAuthenticationHelper.java
create mode 100644 internal/utils/persistence/src/main/scala/org/eclipse/ditto/internal/utils/persistence/pekko/CustomizableScalaDriverPersistenceExtension.scala
create mode 100644 internal/utils/persistence/src/main/scala/org/eclipse/ditto/internal/utils/persistence/pekko/CustomizableScalaMongoDriver.scala
diff --git a/bom/pom.xml b/bom/pom.xml
index e78dc51fdf..1bf1dd7971 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -156,7 +156,7 @@
org.apache.pekko
- pekko-serialization-jackson_2.13
+ pekko-serialization-jackson_${scala.version}
${pekko-bom.version}
@@ -219,7 +219,7 @@
org.mongodb.scala
- mongo-scala-driver_2.13
+ mongo-scala-driver_${scala.version}
${mongo-java-driver.version}
diff --git a/internal/utils/config/src/main/resources/ditto-pekko-config.conf b/internal/utils/config/src/main/resources/ditto-pekko-config.conf
index 0174e5d62d..6bb4cbc672 100644
--- a/internal/utils/config/src/main/resources/ditto-pekko-config.conf
+++ b/internal/utils/config/src/main/resources/ditto-pekko-config.conf
@@ -296,7 +296,7 @@ sharding-dispatcher {
}
pekko.contrib.persistence.mongodb.mongo {
- driver = "pekko.contrib.persistence.mongodb.driver.ScalaDriverPersistenceExtension"
+ driver = "org.eclipse.ditto.internal.utils.persistence.pekko.CustomizableScalaDriverPersistenceExtension"
# Write concerns are one of: Unacknowledged, Acknowledged, Journaled, ReplicaAcknowledged
journal-write-concern = "Acknowledged" # By default was: "Journaled"
diff --git a/internal/utils/extension/pom.xml b/internal/utils/extension/pom.xml
index fc72f735a0..a952718db9 100644
--- a/internal/utils/extension/pom.xml
+++ b/internal/utils/extension/pom.xml
@@ -28,7 +28,7 @@
org.apache.pekko
- pekko-actor_2.13
+ pekko-actor_${scala.version}
org.eclipse.ditto
diff --git a/internal/utils/persistence/pom.xml b/internal/utils/persistence/pom.xml
index f456bf7ec9..afc532d17e 100755
--- a/internal/utils/persistence/pom.xml
+++ b/internal/utils/persistence/pom.xml
@@ -76,9 +76,27 @@
org.mongodb.scala
- mongo-scala-driver_2.13
+ mongo-scala-driver_${scala.version}
+
+
+ org.apache.pekko
+ pekko-persistence-query_${scala.version}
+
+
+
+ nl.grons
+ metrics4-scala_${scala.version}
+
+ software.amazon.awssdk
+ sts
+
+
+ software.amazon.awssdk
+ auth
+
com.github.ben-manes.caffeine
@@ -88,7 +106,7 @@
org.scala-lang.modules
- scala-java8-compat_2.13
+ scala-java8-compat_${scala.version}
@@ -158,22 +176,52 @@
test-jar
test
-
- org.apache.pekko
- pekko-persistence-query_${scala.version}
-
-
- software.amazon.awssdk
- sts
-
-
- software.amazon.awssdk
- auth
-
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+ ${scala.full.version}
+
+
+
+ compile
+
+ compile
+
+ compile
+
+
+ test-compile
+
+ testCompile
+
+ test-compile
+
+
+ process-resources
+
+ compile
+
+
+
+
+
+
+ maven-compiler-plugin
+
+
+ compile
+
+ compile
+
+
+
+
+
org.apache.maven.plugins
maven-jar-plugin
diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/MongoClientWrapper.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/MongoClientWrapper.java
index 174d988dd0..e2a3bd9f1e 100644
--- a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/MongoClientWrapper.java
+++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/MongoClientWrapper.java
@@ -20,7 +20,6 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
@@ -28,16 +27,13 @@
import org.bson.Document;
import org.bson.conversions.Bson;
+import org.eclipse.ditto.internal.utils.persistence.mongo.auth.AwsAuthenticationHelper;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.MongoDbConfig;
import org.reactivestreams.Publisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.mongodb.AwsCredential;
import com.mongodb.ClientSessionOptions;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
-import com.mongodb.MongoCredential;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
@@ -58,13 +54,6 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
-import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.sts.StsClient;
-import software.amazon.awssdk.services.sts.StsClientBuilder;
-import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
-import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
-import software.amazon.awssdk.services.sts.model.Credentials;
/**
* Default implementation of DittoMongoClient.
@@ -72,8 +61,6 @@
@NotThreadSafe
public final class MongoClientWrapper implements DittoMongoClient {
- private static final Logger LOGGER = LoggerFactory.getLogger(MongoClientWrapper.class);
-
private final MongoClient mongoClient;
private final MongoClientSettings clientSettings;
private final MongoDatabase defaultDatabase;
@@ -483,9 +470,10 @@ public MongoClientWrapper build() {
buildAndApplySslSettings();
if (isUseAwsIamRole) {
- applyAwsIamRoleSettings();
+ mongoClientSettingsBuilder.credential(
+ AwsAuthenticationHelper.provideAwsIamBasedMongoCredential(awsRegion, awsRoleArn, awsSessionName)
+ );
}
-
return new MongoClientWrapper(mongoClientSettingsBuilder.build(), defaultDatabaseName,
dittoMongoClientSettingsBuilder.build(), eventLoopGroup);
}
@@ -521,38 +509,5 @@ private static SSLContext createAndInitSslContext() throws NoSuchAlgorithmExcept
result.init(null, null, null);
return result;
}
-
- private void applyAwsIamRoleSettings() {
-
- final StsClientBuilder stsClientBuilder = StsClient.builder()
- .credentialsProvider(DefaultCredentialsProvider.create());
- if (!awsRegion.isEmpty()) {
- stsClientBuilder.region(Region.of(awsRegion));
- }
-
- final Supplier awsFreshCredentialSupplier;
- try (final StsClient stsClient = stsClientBuilder.build()) {
- awsFreshCredentialSupplier = () -> {
- LOGGER.info("Supplying AWS IAM credentials, assuming role <{}> in session name <{}>",
- awsRoleArn, awsSessionName);
-
- // assume role using the AWS SDK
- final AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
- .roleArn(awsRoleArn)
- .roleSessionName(awsSessionName)
- .build();
- final AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest);
- final Credentials awsCredentials = roleResponse.credentials();
-
- return new AwsCredential(awsCredentials.accessKeyId(), awsCredentials.secretAccessKey(),
- awsCredentials.sessionToken());
- };
-
- final MongoCredential credential = MongoCredential.createAwsCredential(null, null)
- .withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier);
-
- mongoClientSettingsBuilder.credential(credential);
- }
- }
}
}
diff --git a/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/auth/AwsAuthenticationHelper.java b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/auth/AwsAuthenticationHelper.java
new file mode 100644
index 0000000000..442507f4f8
--- /dev/null
+++ b/internal/utils/persistence/src/main/java/org/eclipse/ditto/internal/utils/persistence/mongo/auth/AwsAuthenticationHelper.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2024 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.ditto.internal.utils.persistence.mongo.auth;
+
+import java.util.function.Supplier;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.AwsCredential;
+import com.mongodb.MongoCredential;
+
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+/**
+ * Helper class to obtain {@link MongoCredential}s when running Ditto in AWS.
+ */
+public final class AwsAuthenticationHelper {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AwsAuthenticationHelper.class);
+
+ private AwsAuthenticationHelper() {
+ throw new AssertionError();
+ }
+
+ /**
+ * Obtains a {@link MongoCredential} based on AWS IAM "obtaining role" based authentication.
+ *
+ * @param awsRegion the optional to-be-configured AWS region
+ * @param awsRoleArn the role ARN to obtain
+ * @param awsSessionName the session name to use
+ * @return the MongoCredential prepared to authenticate via AWS IAM
+ */
+ public static MongoCredential provideAwsIamBasedMongoCredential(
+ @Nullable final String awsRegion,
+ final String awsRoleArn,
+ final String awsSessionName
+ ) {
+ final StsClientBuilder stsClientBuilder = StsClient.builder()
+ .credentialsProvider(DefaultCredentialsProvider.create());
+ if (awsRegion != null && !awsRegion.isEmpty()) {
+ stsClientBuilder.region(Region.of(awsRegion));
+ }
+
+ final Supplier awsFreshCredentialSupplier;
+ try (final StsClient stsClient = stsClientBuilder.build()) {
+ awsFreshCredentialSupplier = () -> {
+ LOGGER.info("Supplying AWS IAM credentials, assuming role <{}> in session name <{}>",
+ awsRoleArn, awsSessionName);
+
+ // assume role using the AWS SDK
+ final AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
+ .roleArn(awsRoleArn)
+ .roleSessionName(awsSessionName)
+ .build();
+ final AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest);
+ final Credentials awsCredentials = roleResponse.credentials();
+
+ return new AwsCredential(awsCredentials.accessKeyId(), awsCredentials.secretAccessKey(),
+ awsCredentials.sessionToken());
+ };
+
+ return MongoCredential.createAwsCredential(null, null)
+ .withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier);
+ }
+ }
+}
diff --git a/internal/utils/persistence/src/main/scala/org/eclipse/ditto/internal/utils/persistence/pekko/CustomizableScalaDriverPersistenceExtension.scala b/internal/utils/persistence/src/main/scala/org/eclipse/ditto/internal/utils/persistence/pekko/CustomizableScalaDriverPersistenceExtension.scala
new file mode 100644
index 0000000000..47bb236faf
--- /dev/null
+++ b/internal/utils/persistence/src/main/scala/org/eclipse/ditto/internal/utils/persistence/pekko/CustomizableScalaDriverPersistenceExtension.scala
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2024 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.ditto.internal.utils.persistence.pekko
+
+import com.mongodb.MongoCredential
+import com.typesafe.config.Config
+import org.apache.pekko.actor.ActorSystem
+import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig
+import org.eclipse.ditto.internal.utils.persistence.mongo.auth.AwsAuthenticationHelper
+import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig
+import pekko.contrib.persistence.mongodb.driver.{ScalaDriverPersistenceJournaller, ScalaDriverPersistenceReadJournaller, ScalaDriverPersistenceSnapshotter, ScalaMongoDriver}
+import pekko.contrib.persistence.mongodb.{ConfiguredExtension, MongoPersistenceExtension, MongoPersistenceJournalMetrics, MongoPersistenceJournallingApi}
+
+/**
+ * An adjustment of the original pekko-persistence
+ * [[pekko.contrib.persistence.mongodb.driver.ScalaDriverPersistenceExtension]] which can be customized in a way to
+ * overwrite configuration of the used [[ScalaMongoDriver]].
+ * Creates an instance of [[CustomizableScalaMongoDriver]] when a custom [[MongoCredential]] should be provided
+ * to the driver in order to authenticate.
+ *
+ * @param actorSystem the ActorSystem in which the extension was loaded
+ */
+class CustomizableScalaDriverPersistenceExtension(val actorSystem: ActorSystem)
+ extends MongoPersistenceExtension(actorSystem) {
+
+ override def configured(config: Config): Configured = Configured(config)
+
+ case class Configured(config: Config) extends ConfiguredExtension {
+
+ val driver: ScalaMongoDriver = {
+ val mongoDbConfig = DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(actorSystem.settings.config))
+ val optionsConfig = mongoDbConfig.getOptionsConfig
+
+ if (optionsConfig.isUseAwsIamRole) {
+ val mongoCredential = AwsAuthenticationHelper.provideAwsIamBasedMongoCredential(
+ optionsConfig.awsRegion(),
+ optionsConfig.awsRoleArn(),
+ optionsConfig.awsSessionName()
+ )
+ new CustomizableScalaMongoDriver(actorSystem, config, builder => builder.credential(mongoCredential))
+ } else {
+ new ScalaMongoDriver(actorSystem, config)
+ }
+ }
+
+ override lazy val journaler: MongoPersistenceJournallingApi = new ScalaDriverPersistenceJournaller(driver)
+ with MongoPersistenceJournalMetrics {
+ override def driverName = "scala-official"
+ }
+ override lazy val snapshotter = new ScalaDriverPersistenceSnapshotter(driver)
+
+ override lazy val readJournal = new ScalaDriverPersistenceReadJournaller(driver)
+ }
+
+}
diff --git a/internal/utils/persistence/src/main/scala/org/eclipse/ditto/internal/utils/persistence/pekko/CustomizableScalaMongoDriver.scala b/internal/utils/persistence/src/main/scala/org/eclipse/ditto/internal/utils/persistence/pekko/CustomizableScalaMongoDriver.scala
new file mode 100644
index 0000000000..bd2d36068a
--- /dev/null
+++ b/internal/utils/persistence/src/main/scala/org/eclipse/ditto/internal/utils/persistence/pekko/CustomizableScalaMongoDriver.scala
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2024 Contributors to the Eclipse Foundation
+ *
+ * See the NOTICE file(s) distributed with this work for additional
+ * information regarding copyright ownership.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.ditto.internal.utils.persistence.pekko
+
+import com.typesafe.config.Config
+import org.apache.pekko.actor.ActorSystem
+import org.mongodb.scala.MongoClientSettings
+import org.mongodb.scala.MongoClientSettings.Builder
+import pekko.contrib.persistence.mongodb.driver.ScalaMongoDriver
+
+/**
+ * A customizable [[ScalaMongoDriver]] in which the MongoDB driver's [[MongoClientSettings]] can be adjusted via a
+ * provided callback for a [[MongoClientSettings.Builder]].
+ *
+ * @param system the ActorSystem
+ * @param config the config to apply
+ * @param clientSettingsBuilder a callback, providing the [[MongoClientSettings.Builder]] after pekko-persistence
+ * configuration was applied with the purpose to customize it prior to building the
+ * [[org.mongodb.scala.MongoClient]]
+ */
+class CustomizableScalaMongoDriver(system: ActorSystem, config: Config,
+ clientSettingsBuilder: Builder => Builder)
+ extends ScalaMongoDriver(system, config) {
+
+ override val mongoClientSettings: MongoClientSettings = clientSettingsBuilder
+ .apply(scalaDriverSettings.configure(mongoUri))
+ .build()
+
+}
diff --git a/policies/service/src/test/resources/test.conf b/policies/service/src/test/resources/test.conf
index 4b96f8ce8d..80e906dbd0 100755
--- a/policies/service/src/test/resources/test.conf
+++ b/policies/service/src/test/resources/test.conf
@@ -90,7 +90,7 @@ pekko {
}
pekko.contrib.persistence.mongodb.mongo {
- driver = "pekko.contrib.persistence.mongodb.driver.ScalaDriverPersistenceExtension"
+ driver = "org.eclipse.ditto.internal.utils.persistence.pekko.CustomizableScalaDriverPersistenceExtension"
}
pekko.persistence {
diff --git a/things/service/src/test/resources/test.conf b/things/service/src/test/resources/test.conf
index 7015c2f3aa..33951285bc 100755
--- a/things/service/src/test/resources/test.conf
+++ b/things/service/src/test/resources/test.conf
@@ -126,7 +126,7 @@ pekko {
}
pekko.contrib.persistence.mongodb.mongo {
- driver = "pekko.contrib.persistence.mongodb.driver.ScalaDriverPersistenceExtension"
+ driver = "org.eclipse.ditto.internal.utils.persistence.pekko.CustomizableScalaDriverPersistenceExtension"
}
pekko.persistence {