diff --git a/v2/sourcedb-to-spanner/pom.xml b/v2/sourcedb-to-spanner/pom.xml
index 93ff91864a..575fd053df 100644
--- a/v2/sourcedb-to-spanner/pom.xml
+++ b/v2/sourcedb-to-spanner/pom.xml
@@ -158,5 +158,11 @@
4.1
compile
+
+ org.bouncycastle
+ bcpkix-jdk15on
+ 1.60
+ test
+
diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/EmbeddedCassandra.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/EmbeddedCassandra.java
index 244b6d4d93..4960a4f627 100644
--- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/EmbeddedCassandra.java
+++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/EmbeddedCassandra.java
@@ -22,16 +22,45 @@
import com.github.nosan.embedded.cassandra.commons.ClassPathResource;
import com.github.nosan.embedded.cassandra.cql.CqlScript;
import com.google.common.collect.ImmutableList;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.math.BigInteger;
import java.net.InetSocketAddress;
+import java.nio.file.Path;
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.KeyStore;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.Security;
+import java.security.cert.X509Certificate;
+import java.util.Date;
import javax.annotation.Nullable;
-import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x509.BasicConstraints;
+import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.bouncycastle.cert.X509v3CertificateBuilder;
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.operator.ContentSigner;
+import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder;
/**
* Utility Class to start and stop Embedded Cassandra. {@link Cassandra Embedded Cassandra} is
* equivalent to real cassandra at the level of network protocol. So using this over mocks wherever
* possible gives us much better test coverage. Note: Prefer using {@link SharedEmbeddedCassandra}
* to share an instance of Embedded Cassandra.
+ *
Note on SSL Mode:
+ *
+ *
When the test Cassandra Server has to run with SSL enabled, it needs to present an SSL certificate to the client,
+ * which the client can verify.
+ * In a UT environment, we won't have a certificate authority that will sign the certificates.
+ * For this, We can either check in a private Key and Cert to the repo itself which is used in UT, which is less ideal, or,
+ * We can generate a temporary random key and certificate which would be used by the server and trusted by the client in a UT setting.
+ * We are taking the later route in order to avoid having to check in keys and certificates to the repo.
+ *
*/
public class EmbeddedCassandra implements AutoCloseable {
private Cassandra embeddedCassandra;
@@ -39,24 +68,51 @@ public class EmbeddedCassandra implements AutoCloseable {
private ImmutableList contactPoints;
private final Settings settings;
private static final String LOCAL_DATA_CENTER = "datacenter1";
+ /**
+ * Temporary file for storing the certificate key.
+ */
+ private java.io.File keyStoreFile = null;
+ /**
+ * Temporary file for storing the certificate.
+ */
+ private java.io.File trustStoreFile = null;
- public EmbeddedCassandra(String config, @Nullable String cqlResource) throws IOException {
+ public EmbeddedCassandra(String config, @Nullable String cqlResource, boolean clientEncryption)
+ throws IOException {
var builder =
new CassandraBuilder()
.addEnvironmentVariable("JAVA_HOME", System.getProperty("java.home"))
.addEnvironmentVariable("JRE_HOME", System.getProperty("jre.home"))
// Check [CASSANDRA-13396](https://issues.apache.org/jira/browse/CASSANDRA-13396)
.addSystemProperty("cassandra.insecure.udf", "true")
- .configFile(new ClassPathResource(config));
+ .configFile(new ClassPathResource(config))
+ // Choose from available ports on the test machine.
+ .addConfigProperty("native_transport_port", 0)
+ .addConfigProperty("storage_port", 0)
+ .addSystemProperty("cassandra.jmx.local.port", 0)
+ .registerShutdownHook(true);
+ if (clientEncryption) {
+
+ // Generate temporary keystore and truststore files
+ keyStoreFile = java.io.File.createTempFile("client", ".keystore");
+ trustStoreFile = java.io.File.createTempFile("client", ".truststore");
+ builder =
+ builder
+ .addConfigProperty("client_encryption_options.enabled", true)
+ .addConfigProperty("client_encryption_options.optional", true)
+ .addConfigProperty(
+ "client_encryption_options.keystore", keyStoreFile.getAbsolutePath());
+ createTemporaryKeyStore(keyStoreFile, trustStoreFile);
+ }
// Ref: https://stackoverflow.com/questions/78195798/embedded-cassandra-not-working-in-java-21
if (Runtime.version().compareTo(Runtime.Version.parse("12")) >= 0) {
builder = builder.addSystemProperty("java.security.manager", "allow");
}
/*
- * TODO (vardhanvthigle): Get EmbeddedCassandea 4.0 working with our UT JVM.
+ * TODO (vardhanvthigle): Get EmbeddedCassandra 4.0 working with our UT JVM.
// If we spawn Cassandra 4.0.0 for testing, it tries to set biased locking, which is not recognized by some JVMs.
builder = builder.addJvmOptions("-XX:+IgnoreUnrecognizedVMOptions");
- // This is needed as Cassnadra 4.0 goes for deep reflections for java pacakges.
+ // This is needed as Cassandra 4.0 goes for deep reflections for java packages.
builder = builder.addEnvironmentVariable("JDK_JAVA_OPTIONS", "--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED"
+ "--add-opens java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED");
builder = builder.version("4.0.15");
@@ -78,6 +134,97 @@ public EmbeddedCassandra(String config, @Nullable String cqlResource) throws IOE
}
}
+ /**
+ * Generate a Random KeyPair for Signing the SSL certificate in UT environment.
+ */
+ private static KeyPair generateTestKeyPair() throws NoSuchAlgorithmException {
+ KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
+ keyPairGenerator.initialize(2048);
+ return keyPairGenerator.generateKeyPair();
+ }
+
+ /**
+ * Generate a random Key Pair and a Self Signed Certificate for the UT environment.
+ */
+ private static void createTemporaryKeyStore(
+ java.io.File keyStoreFile, java.io.File trustStoreFile) {
+ Security.addProvider(new BouncyCastleProvider());
+
+ try {
+ // Generate KeyPair
+ KeyPair keyPair = generateTestKeyPair();
+
+ // Generate Certificate
+ X509Certificate certificate = generateTestCertificate(keyPair);
+
+ // Create and save keystore
+ createKeyStore(keyStoreFile, keyPair, certificate, "cassandra".toCharArray());
+
+ // Create and save truststore
+ createTrustStore(trustStoreFile, certificate, "cassandra".toCharArray());
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void createKeyStore(
+ java.io.File keyStoreFile, KeyPair keyPair, X509Certificate certificate, char[] password)
+ throws Exception {
+ KeyStore keyStore = KeyStore.getInstance("JKS");
+ keyStore.load(null, null);
+ keyStore.setKeyEntry(
+ "client",
+ keyPair.getPrivate(),
+ password,
+ new java.security.cert.Certificate[] {certificate});
+ try (FileOutputStream fos = new FileOutputStream(keyStoreFile)) {
+ keyStore.store(fos, password);
+ }
+ }
+
+ private static void createTrustStore(
+ java.io.File trustStoreFile, X509Certificate certificate, char[] password) throws Exception {
+ KeyStore trustStore = KeyStore.getInstance("JKS");
+ trustStore.load(null, null);
+ trustStore.setCertificateEntry("localhost", certificate);
+ try (FileOutputStream fos = new FileOutputStream(trustStoreFile)) {
+ trustStore.store(fos, password);
+ }
+ }
+
+ /**
+ * Generate a selfsigned test certificate.
+ */
+ private static X509Certificate generateTestCertificate(KeyPair keyPair) throws Exception {
+ // Prepare necessary information
+ X500Name issuer = new X500Name("CN=localhost");
+ BigInteger serial = new BigInteger(160, new SecureRandom());
+ Date notBefore = new Date();
+ Date notAfter = new Date(notBefore.getTime() + 365 * 24 * 60 * 60 * 1000L); // 1 year validity
+ X500Name subject = issuer;
+ SubjectPublicKeyInfo publicKeyInfo =
+ SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded());
+
+ // Create certificate builder
+ X509v3CertificateBuilder certBuilder =
+ new X509v3CertificateBuilder(issuer, serial, notBefore, notAfter, subject, publicKeyInfo);
+
+ // Add Basic Constraints (optional, for CA certificates)
+ certBuilder.addExtension(
+ org.bouncycastle.asn1.x509.Extension.basicConstraints, true, new BasicConstraints(true));
+
+ // Create content signer
+ ContentSigner contentSigner =
+ new JcaContentSignerBuilder("SHA256WithRSAEncryption").build(keyPair.getPrivate());
+
+ // Build the certificate holder
+ X509CertificateHolder certHolder = certBuilder.build(contentSigner);
+
+ // Convert to X509Certificate
+ return new JcaX509CertificateConverter().getCertificate(certHolder);
+ }
+
public Cassandra getEmbeddedCassandra() {
return embeddedCassandra;
}
@@ -98,10 +245,27 @@ public ImmutableList getContactPoints() {
return this.contactPoints;
}
+ public Path getKeyStorePath() {
+ return this.keyStoreFile.toPath();
+ }
+
+ public Path getTrustStorePath() {
+ return this.trustStoreFile.toPath();
+ }
+
@Override
public void close() throws Exception {
if (embeddedCassandra != null) {
embeddedCassandra.stop();
}
+
+ if (keyStoreFile != null && keyStoreFile.exists()) {
+ keyStoreFile.delete();
+ keyStoreFile = null;
+ }
+ if (trustStoreFile != null && trustStoreFile.exists()) {
+ trustStoreFile.delete();
+ trustStoreFile = null;
+ }
}
}
diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/SharedEmbeddedCassandra.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/SharedEmbeddedCassandra.java
index 97386af89d..edc289f515 100644
--- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/SharedEmbeddedCassandra.java
+++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/SharedEmbeddedCassandra.java
@@ -46,13 +46,19 @@ public class SharedEmbeddedCassandra implements AutoCloseable {
*
* @param config - config.yaml
* @param cqlResource - cql script.
+ * @param clientEncryption - set to true if Client side SSL is needed.
* @throws IOException
*/
- public SharedEmbeddedCassandra(String config, @Nullable String cqlResource) throws IOException {
- this.config = Configuration.create(config, cqlResource);
+ public SharedEmbeddedCassandra(
+ String config, @Nullable String cqlResource, Boolean clientEncryption) throws IOException {
+ this.config = Configuration.create(config, cqlResource, clientEncryption);
this.embeddedCassandra = getEmbeddedCassandra(this.config);
}
+ public SharedEmbeddedCassandra(String config, @Nullable String cqlResource) throws IOException {
+ this(config, cqlResource, Boolean.FALSE);
+ }
+
/**
* Get a reference to {@link com.github.nosan.embedded.cassandra.Cassandra Embedded Cassandra}
* managed by {@link SharedEmbeddedCassandra}.
@@ -90,7 +96,10 @@ private static EmbeddedCassandra getEmbeddedCassandra(Configuration configuratio
} else {
Log.info("Starting Shared embedded Cassandra for configuration = {}", configuration);
embeddedCassandra =
- new EmbeddedCassandra(configuration.configYaml(), configuration.cqlScript());
+ new EmbeddedCassandra(
+ configuration.configYaml(),
+ configuration.cqlScript(),
+ configuration.clientEncryption());
RefCountedEmbeddedCassandra refCountedEmbeddedCassandra =
RefCountedEmbeddedCassandra.create(embeddedCassandra);
refCountedEmbeddedCassandra.refIncrementAndGet();
@@ -123,8 +132,10 @@ private static void putEmbeddedCassandra(Configuration configuration) throws Exc
abstract static class Configuration {
public AtomicInteger refCount = new AtomicInteger();
- public static Configuration create(String configYaml, String cqlScript) {
- return new AutoValue_SharedEmbeddedCassandra_Configuration(configYaml, cqlScript);
+ public static Configuration create(
+ String configYaml, String cqlScript, Boolean clientEncryption) {
+ return new AutoValue_SharedEmbeddedCassandra_Configuration(
+ configYaml, cqlScript, clientEncryption);
}
@Nullable
@@ -132,6 +143,9 @@ public static Configuration create(String configYaml, String cqlScript) {
@Nullable
public abstract String cqlScript();
+
+ @Nullable
+ public abstract Boolean clientEncryption();
}
// This is a private class, and it must be ensured that refcounting is synchronized.
diff --git a/v2/sourcedb-to-spanner/src/test/resources/CassandraUT/basicConfig.yaml b/v2/sourcedb-to-spanner/src/test/resources/CassandraUT/basicConfig.yaml
index 1c150f178f..0d71466bf6 100644
--- a/v2/sourcedb-to-spanner/src/test/resources/CassandraUT/basicConfig.yaml
+++ b/v2/sourcedb-to-spanner/src/test/resources/CassandraUT/basicConfig.yaml
@@ -450,7 +450,7 @@ seed_provider:
parameters:
# seeds is actually a comma-delimited list of addresses.
# Ex: ",,"
- - seeds: "127.0.0.1:7000"
+ - seeds: "127.0.0.1"
# For workloads with more data than can fit in memory, Cassandra's
# bottleneck will be reads that need to fetch data from