Skip to content

Commit

Permalink
Cassandra metadata PR To accomodate Driver Loader Class (#33)
Browse files Browse the repository at this point in the history
* Removed *

* Create README.md for UDF samples (GoogleCloudPlatform#2083)

This commit adds a README.md file to the  directory. The README file provides descriptions for each of the sample Javascript UDF files in the directory, including their purpose and usage examples.

Co-authored-by: labs-code-app[bot] <161369871+labs-code-app[bot]@users.noreply.github.com>

* CassandraDriverConfigLoader from GCS (GoogleCloudPlatform#2077)

* Added Config File Path

* Added Fix for Loading Driver Options

* Added Dependecy Fixes

* Fix UT

---------

Co-authored-by: liferoad <huxiangqian@gmail.com>
Co-authored-by: labs-code-app[bot] <161369871+labs-code-app[bot]@users.noreply.github.com>
Co-authored-by: Vardhan Vinay Thigle <39047439+VardhanThigle@users.noreply.github.com>
  • Loading branch information
4 people authored Dec 28, 2024
1 parent c640d11 commit 0e3cca3
Show file tree
Hide file tree
Showing 13 changed files with 2,019 additions and 175 deletions.
35 changes: 35 additions & 0 deletions v2/common/src/main/resources/udf-samples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# UDF Samples

This directory contains sample Javascript UDFs that can be used to enrich, filter, route, and transform data in Dataflow pipelines.

## enrich.js

Adds a new field `source` with the value `pos` to the incoming JSON event.

## enrich_log.js

Adds fields to a Cloud Logging log entry received from Pub/Sub. Adds `inputSubscription` and `callingAppId` based on the log entry content.

## filter.js

Filters out incoming JSON events where the `severity` field is equal to `DEBUG`.

## route.js

Routes incoming JSON events to the dead-letter queue if they do not have a `severity` property. Throws an error with the event ID to trigger dead-letter queue routing.

## transform.js

Transforms fields of incoming JSON events. Normalizes the `source` field to lowercase, redacts `sensitiveField` to `REDACTED`, and removes `redundantField`.

## transform_csv.js

Transforms an incoming CSV line into a JSON object. The output JSON object has fields `location`, `name`, `age`, `color`, and `coffee`, mapped from the corresponding CSV columns.

## transform_log.js

Transforms Cloud Logging log entries received from Pub/Sub. If the log entry has a `textPayload`, it returns the `textPayload` as a string. Otherwise, it returns the original JSON object as a string.

## transform_log_splunk.js

Sets Splunk HTTP Event Collector (HEC) metadata for Cloud Logging log entries received from Pub/Sub. Sets the `index`, `source`, and `sourcetype` metadata fields based on the log entry content.
13 changes: 10 additions & 3 deletions v2/spanner-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,17 @@
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.17.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.17.0</version> <!-- Use the latest version -->
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>3.12.4</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

import autovalue.shaded.com.google.common.collect.ImmutableList;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.google.cloud.teleport.v2.spanner.migrations.schema.*;
import com.google.cloud.teleport.v2.spanner.migrations.schema.ColumnPK;
import com.google.cloud.teleport.v2.spanner.migrations.schema.NameAndCols;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnDefinition;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnType;
import com.google.cloud.teleport.v2.spanner.migrations.schema.cassandra.SourceColumn;
import com.google.cloud.teleport.v2.spanner.migrations.schema.cassandra.SourceSchema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.cassandra.SourceTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,159 +15,130 @@
*/
package com.google.cloud.teleport.v2.spanner.migrations.shard;

import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import java.util.List;
import java.util.Objects;

public class CassandraShard extends Shard {
private String keyspace;
private String consistencyLevel = "LOCAL_QUORUM";
private boolean sslOptions = false;
private String protocolVersion = "v5";
private String dataCenter = "datacenter1";
private int localPoolSize = 1024;
private int remotePoolSize = 256;

public CassandraShard(
String logicalShardId,
String host,
String port,
String user,
String password,
String keyspace,
String consistencyLevel,
Boolean sslOptions,
String protocolVersion,
String dataCenter,
Integer localPoolSize,
Integer remotePoolSize) {
super(logicalShardId, host, port, user, password, null, null, null, null);
this.keyspace = keyspace;
this.consistencyLevel = consistencyLevel;
this.sslOptions = sslOptions;
this.protocolVersion = protocolVersion;
this.dataCenter = dataCenter;
this.localPoolSize = localPoolSize;
this.remotePoolSize = remotePoolSize;
}

// Getters
public String getKeySpaceName() {
return keyspace;
}
private final DriverConfigLoader configLoader;

public String getConsistencyLevel() {
return consistencyLevel;
public CassandraShard(DriverConfigLoader configLoader) {
super(null, null, null, null, null, null, null, null, null);
this.configLoader = configLoader;
validateFields();
extractAndSetHostAndPort();
}

public boolean getSSLOptions() {
return sslOptions;
private void validateFields() {
if (getContactPoints() == null || getContactPoints().isEmpty()) {
throw new IllegalArgumentException("CONTACT_POINTS cannot be null or empty.");
}
if (getKeySpaceName() == null || getKeySpaceName().isEmpty()) {
throw new IllegalArgumentException("SESSION_KEYSPACE cannot be null or empty.");
}
}

public String getProtocolVersion() {
return protocolVersion;
}
private void extractAndSetHostAndPort() {
String firstContactPoint = getContactPoints().get(0);
String[] parts = firstContactPoint.split(":");

public String getDataCenter() {
return dataCenter;
}
if (parts.length < 2) {
throw new IllegalArgumentException("Invalid contact point format: " + firstContactPoint);
}

public int getLocalPoolSize() {
return localPoolSize;
String host = parts[0];
String port = parts[1];

setHost(host);
setPort(port);
}

public int getRemotePoolSize() {
return remotePoolSize;
private DriverExecutionProfile getProfile() {
return configLoader.getInitialConfig().getDefaultProfile();
}

// Setters
public void setKeySpaceName(String keySpaceName) {
this.keyspace = keySpaceName;
// Getters that fetch data from DriverConfigLoader
public List<String> getContactPoints() {
return getProfile().getStringList(DefaultDriverOption.CONTACT_POINTS);
}

public void setConsistencyLevel(String consistencyLevel) {
this.consistencyLevel = consistencyLevel;
public String getKeySpaceName() {
return getProfile().getString(DefaultDriverOption.SESSION_KEYSPACE);
}

public void setSslOptions(boolean sslOptions) {
this.sslOptions = sslOptions;
public String getConsistencyLevel() {
return getProfile().getString(DefaultDriverOption.REQUEST_CONSISTENCY, "LOCAL_QUORUM");
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
public boolean isSslEnabled() {
return getProfile().getBoolean(DefaultDriverOption.SSL_ENGINE_FACTORY_CLASS, false);
}

public void setDataCenter(String dataCenter) {
this.dataCenter = dataCenter;
public String getProtocolVersion() {
return getProfile().getString(DefaultDriverOption.PROTOCOL_VERSION, "V5");
}

public void setLocalPoolSize(int localPoolSize) {
this.localPoolSize = localPoolSize;
public String getDataCenter() {
return getProfile()
.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, "datacenter1");
}

public void setRemotePoolSize(int remotePoolSize) {
this.remotePoolSize = remotePoolSize;
public int getLocalPoolSize() {
return getProfile().getInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 1024);
}

public void validate() {
validateField(getHost(), "Host");
validateField(getPort(), "Port");
validateField(getUserName(), "Username");
validateField(getPassword(), "Password");
validateField(getKeySpaceName(), "Keyspace");
public int getRemotePoolSize() {
return getProfile().getInt(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE, 256);
}

private void validateField(String value, String fieldName) {
if (value == null || value.isEmpty()) {
throw new IllegalArgumentException(fieldName + " is required");
}
public DriverConfigLoader getConfigLoader() {
return configLoader;
}

@Override
public String toString() {
return String.format(
"CassandraShard{logicalShardId='%s', host='%s', port='%s', user='%s', keySpaceName='%s', datacenter='%s', consistencyLevel='%s', protocolVersion='%s'}",
"CassandraShard{logicalShardId='%s', contactPoints=%s, keyspace='%s', consistencyLevel='%s', sslOptions=%b, protocolVersion='%s', dataCenter='%s', localPoolSize=%d, remotePoolSize=%d, host='%s', port='%s'}",
getLogicalShardId(),
getHost(),
getPort(),
getUserName(),
getContactPoints(),
getKeySpaceName(),
getDataCenter(),
getConsistencyLevel(),
getProtocolVersion());
isSslEnabled(),
getProtocolVersion(),
getDataCenter(),
getLocalPoolSize(),
getRemotePoolSize(),
getHost(),
getPort());
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof CassandraShard)) return false;
CassandraShard that = (CassandraShard) o;
return sslOptions == that.sslOptions
&& localPoolSize == that.localPoolSize
&& remotePoolSize == that.remotePoolSize
&& Objects.equals(getLogicalShardId(), that.getLogicalShardId())
&& Objects.equals(getHost(), that.getHost())
&& Objects.equals(getPort(), that.getPort())
&& Objects.equals(getUserName(), that.getUserName())
&& Objects.equals(getPassword(), that.getPassword())
&& Objects.equals(keyspace, that.keyspace)
&& Objects.equals(dataCenter, that.dataCenter)
&& Objects.equals(consistencyLevel, that.consistencyLevel)
&& Objects.equals(protocolVersion, that.protocolVersion);
return Objects.equals(getContactPoints(), that.getContactPoints())
&& Objects.equals(getKeySpaceName(), that.getKeySpaceName())
&& Objects.equals(getConsistencyLevel(), that.getConsistencyLevel())
&& Objects.equals(isSslEnabled(), that.isSslEnabled())
&& Objects.equals(getProtocolVersion(), that.getProtocolVersion())
&& Objects.equals(getDataCenter(), that.getDataCenter())
&& Objects.equals(getLocalPoolSize(), that.getLocalPoolSize())
&& Objects.equals(getRemotePoolSize(), that.getRemotePoolSize());
}

@Override
public int hashCode() {
return Objects.hash(
getLogicalShardId(),
getHost(),
getPort(),
getUserName(),
getPassword(),
keyspace,
dataCenter,
consistencyLevel,
protocolVersion,
sslOptions,
localPoolSize,
remotePoolSize);
getContactPoints(),
getKeySpaceName(),
getConsistencyLevel(),
isSslEnabled(),
getProtocolVersion(),
getDataCenter(),
getLocalPoolSize(),
getRemotePoolSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,12 @@
*/
package com.google.cloud.teleport.v2.spanner.migrations.utils;

import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,8 +31,6 @@
public class CassandraConfigFileReader {

private static final Logger LOG = LoggerFactory.getLogger(CassandraConfigFileReader.class);
private static final Gson GSON =
new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.IDENTITY).create();

/**
* Reads the Cassandra configuration file from the specified GCS path and converts it into a list
Expand All @@ -49,28 +40,20 @@ public class CassandraConfigFileReader {
* @return a list containing the parsed CassandraShard.
*/
public List<Shard> getCassandraShard(String cassandraConfigFilePath) {
try (InputStream stream = getFileInputStream(cassandraConfigFilePath)) {
String configContent = IOUtils.toString(stream, StandardCharsets.UTF_8);
CassandraShard shard = GSON.fromJson(configContent, CassandraShard.class);

LOG.info("Successfully read Cassandra config: {}", shard);
try {
LOG.info("Reading Cassandra configuration from: {}", cassandraConfigFilePath);
DriverConfigLoader configLoader =
CassandraDriverConfigLoader.loadFile(cassandraConfigFilePath);
CassandraShard shard = new CassandraShard(configLoader);
LOG.info("Successfully created CassandraShard: {}", shard);
return Collections.singletonList(shard);
} catch (IOException e) {
String errorMessage =
"Failed to read Cassandra config file. Ensure it is ASCII or UTF-8 encoded and contains a well-formed JSON string.";
String.format(
"Failed to read Cassandra config file from path: %s. Ensure it exists, is accessible, and is properly formatted.",
cassandraConfigFilePath);
LOG.error(errorMessage, e);
throw new RuntimeException(errorMessage, e);
}
}

/**
* Retrieves an InputStream for the specified GCS file path.
*
* @param filePath the GCS file path.
* @return an InputStream for the file.
* @throws IOException if the file cannot be accessed or opened.
*/
private InputStream getFileInputStream(String filePath) throws IOException {
return Channels.newInputStream(FileSystems.open(FileSystems.matchNewResource(filePath, false)));
}
}
Loading

0 comments on commit 0e3cca3

Please sign in to comment.