Skip to content

Commit

Permalink
#2019 configure pekko-persistence with AWS IAM based auth
Browse files Browse the repository at this point in the history
* provide own "driver" for the pekko-persistence plugin
* configure AWS IAM based authentication based on the Ditto configuration
* moved common AWS MongoCredential obtaining logic to new AwsAuthenticationHelper class
  • Loading branch information
hu-ahmed authored and thjaeckle committed Sep 17, 2024
1 parent 44729da commit f3d9fdb
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 69 deletions.
4 changes: 2 additions & 2 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
<!-- Pekko -->
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-serialization-jackson_2.13</artifactId>
<artifactId>pekko-serialization-jackson_${scala.version}</artifactId>
<version>${pekko-bom.version}</version>
</dependency>
<dependency>
Expand Down Expand Up @@ -219,7 +219,7 @@
</dependency>
<dependency>
<groupId>org.mongodb.scala</groupId>
<artifactId>mongo-scala-driver_2.13</artifactId>
<artifactId>mongo-scala-driver_${scala.version}</artifactId>
<version>${mongo-java-driver.version}</version>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ sharding-dispatcher {
}

pekko.contrib.persistence.mongodb.mongo {
driver = "pekko.contrib.persistence.mongodb.driver.ScalaDriverPersistenceExtension"
driver = "org.eclipse.ditto.internal.utils.persistence.pekko.CustomizableScalaDriverPersistenceExtension"

# Write concerns are one of: Unacknowledged, Acknowledged, Journaled, ReplicaAcknowledged
journal-write-concern = "Acknowledged" # By default was: "Journaled"
Expand Down
2 changes: 1 addition & 1 deletion internal/utils/extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<dependencies>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor_2.13</artifactId>
<artifactId>pekko-actor_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
Expand Down
76 changes: 62 additions & 14 deletions internal/utils/persistence/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,27 @@
</dependency>
<dependency>
<groupId>org.mongodb.scala</groupId>
<artifactId>mongo-scala-driver_2.13</artifactId>
<artifactId>mongo-scala-driver_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-persistence-query_${scala.version}</artifactId>
</dependency>
<dependency>
<!-- This dependency is required to build the MongoDriverConfigurableScalaDriverPersistenceExtension. Otherwise
java.lang.ClassNotFoundException: com.codahale.metrics.MetricRegistry is thrown -->
<groupId>nl.grons</groupId>
<artifactId>metrics4-scala_${scala.version}</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
Expand All @@ -88,7 +106,7 @@
<!-- Can be removed after dropping support for scala 2.12 and use classes under scala.jdk instead -->
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.13</artifactId>
<artifactId>scala-java8-compat_${scala.version}</artifactId>
</dependency>


Expand Down Expand Up @@ -158,22 +176,52 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-persistence-query_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<configuration>
<scalaVersion>${scala.full.version}</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,20 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import javax.net.ssl.SSLContext;

import org.bson.Document;
import org.bson.conversions.Bson;
import org.eclipse.ditto.internal.utils.persistence.mongo.auth.AwsAuthenticationHelper;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.MongoDbConfig;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.AwsCredential;
import com.mongodb.ClientSessionOptions;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
Expand All @@ -58,22 +54,13 @@

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.StsClientBuilder;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
import software.amazon.awssdk.services.sts.model.Credentials;

/**
* Default implementation of DittoMongoClient.
*/
@NotThreadSafe
public final class MongoClientWrapper implements DittoMongoClient {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoClientWrapper.class);

private final MongoClient mongoClient;
private final MongoClientSettings clientSettings;
private final MongoDatabase defaultDatabase;
Expand Down Expand Up @@ -483,9 +470,10 @@ public MongoClientWrapper build() {
buildAndApplySslSettings();

if (isUseAwsIamRole) {
applyAwsIamRoleSettings();
mongoClientSettingsBuilder.credential(
AwsAuthenticationHelper.provideAwsIamBasedMongoCredential(awsRegion, awsRoleArn, awsSessionName)
);
}

return new MongoClientWrapper(mongoClientSettingsBuilder.build(), defaultDatabaseName,
dittoMongoClientSettingsBuilder.build(), eventLoopGroup);
}
Expand Down Expand Up @@ -521,38 +509,5 @@ private static SSLContext createAndInitSslContext() throws NoSuchAlgorithmExcept
result.init(null, null, null);
return result;
}

private void applyAwsIamRoleSettings() {

final StsClientBuilder stsClientBuilder = StsClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create());
if (!awsRegion.isEmpty()) {
stsClientBuilder.region(Region.of(awsRegion));
}

final Supplier<AwsCredential> awsFreshCredentialSupplier;
try (final StsClient stsClient = stsClientBuilder.build()) {
awsFreshCredentialSupplier = () -> {
LOGGER.info("Supplying AWS IAM credentials, assuming role <{}> in session name <{}>",
awsRoleArn, awsSessionName);

// assume role using the AWS SDK
final AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
.roleArn(awsRoleArn)
.roleSessionName(awsSessionName)
.build();
final AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest);
final Credentials awsCredentials = roleResponse.credentials();

return new AwsCredential(awsCredentials.accessKeyId(), awsCredentials.secretAccessKey(),
awsCredentials.sessionToken());
};

final MongoCredential credential = MongoCredential.createAwsCredential(null, null)
.withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier);

mongoClientSettingsBuilder.credential(credential);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.persistence.mongo.auth;

import java.util.function.Supplier;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.AwsCredential;
import com.mongodb.MongoCredential;

import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.StsClientBuilder;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
import software.amazon.awssdk.services.sts.model.Credentials;

/**
* Helper class to obtain {@link MongoCredential}s when running Ditto in AWS.
*/
public final class AwsAuthenticationHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(AwsAuthenticationHelper.class);

private AwsAuthenticationHelper() {
throw new AssertionError();
}

/**
* Obtains a {@link MongoCredential} based on AWS IAM "obtaining role" based authentication.
*
* @param awsRegion the optional to-be-configured AWS region
* @param awsRoleArn the role ARN to obtain
* @param awsSessionName the session name to use
* @return the MongoCredential prepared to authenticate via AWS IAM
*/
public static MongoCredential provideAwsIamBasedMongoCredential(
@Nullable final String awsRegion,
final String awsRoleArn,
final String awsSessionName
) {
final StsClientBuilder stsClientBuilder = StsClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create());
if (awsRegion != null && !awsRegion.isEmpty()) {
stsClientBuilder.region(Region.of(awsRegion));
}

final Supplier<AwsCredential> awsFreshCredentialSupplier;
try (final StsClient stsClient = stsClientBuilder.build()) {
awsFreshCredentialSupplier = () -> {
LOGGER.info("Supplying AWS IAM credentials, assuming role <{}> in session name <{}>",
awsRoleArn, awsSessionName);

// assume role using the AWS SDK
final AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
.roleArn(awsRoleArn)
.roleSessionName(awsSessionName)
.build();
final AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest);
final Credentials awsCredentials = roleResponse.credentials();

return new AwsCredential(awsCredentials.accessKeyId(), awsCredentials.secretAccessKey(),
awsCredentials.sessionToken());
};

return MongoCredential.createAwsCredential(null, null)
.withMechanismProperty(MongoCredential.AWS_CREDENTIAL_PROVIDER_KEY, awsFreshCredentialSupplier);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.persistence.pekko

import com.mongodb.MongoCredential
import com.typesafe.config.Config
import org.apache.pekko.actor.ActorSystem
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig
import org.eclipse.ditto.internal.utils.persistence.mongo.auth.AwsAuthenticationHelper
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig
import pekko.contrib.persistence.mongodb.driver.{ScalaDriverPersistenceJournaller, ScalaDriverPersistenceReadJournaller, ScalaDriverPersistenceSnapshotter, ScalaMongoDriver}
import pekko.contrib.persistence.mongodb.{ConfiguredExtension, MongoPersistenceExtension, MongoPersistenceJournalMetrics, MongoPersistenceJournallingApi}

/**
* An adjustment of the original pekko-persistence
* [[pekko.contrib.persistence.mongodb.driver.ScalaDriverPersistenceExtension]] which can be customized in a way to
* overwrite configuration of the used [[ScalaMongoDriver]].
* Creates an instance of [[CustomizableScalaMongoDriver]] when a custom [[MongoCredential]] should be provided
* to the driver in order to authenticate.
*
* @param actorSystem the ActorSystem in which the extension was loaded
*/
class CustomizableScalaDriverPersistenceExtension(val actorSystem: ActorSystem)
extends MongoPersistenceExtension(actorSystem) {

override def configured(config: Config): Configured = Configured(config)

case class Configured(config: Config) extends ConfiguredExtension {

val driver: ScalaMongoDriver = {
val mongoDbConfig = DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(actorSystem.settings.config))
val optionsConfig = mongoDbConfig.getOptionsConfig

if (optionsConfig.isUseAwsIamRole) {
val mongoCredential = AwsAuthenticationHelper.provideAwsIamBasedMongoCredential(
optionsConfig.awsRegion(),
optionsConfig.awsRoleArn(),
optionsConfig.awsSessionName()
)
new CustomizableScalaMongoDriver(actorSystem, config, builder => builder.credential(mongoCredential))
} else {
new ScalaMongoDriver(actorSystem, config)
}
}

override lazy val journaler: MongoPersistenceJournallingApi = new ScalaDriverPersistenceJournaller(driver)
with MongoPersistenceJournalMetrics {
override def driverName = "scala-official"
}
override lazy val snapshotter = new ScalaDriverPersistenceSnapshotter(driver)

override lazy val readJournal = new ScalaDriverPersistenceReadJournaller(driver)
}

}
Loading

0 comments on commit f3d9fdb

Please sign in to comment.