Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dml integration #51

Merged
merged 7 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ public Schema() {
this.empty = true;
}

public Schema(Map<String, SpannerTable> spSchema, Map<String, SourceTable> srcSchema) {
this.spSchema = spSchema;
this.srcSchema = srcSchema;
this.syntheticPKeys = new HashMap<String, SyntheticPKey>();
this.empty = (spSchema == null || srcSchema == null);
}

public Schema(
Map<String, SpannerTable> spSchema,
Map<String, SyntheticPKey> syntheticPKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public class Constants {
public static final String DEFAULT_SHARD_ID = "single_shard";

public static final String SOURCE_MYSQL = "mysql";

public static final String SOURCE_CASSANDRA = "cassandra";

// Message written to the file for filtered records
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.google.cloud.teleport.v2.templates.dbutils.processor;

import static com.google.cloud.teleport.v2.templates.constants.Constants.SOURCE_CASSANDRA;

import com.google.cloud.teleport.v2.spanner.exceptions.InvalidTransformationException;
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventToMapConvertor;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
Expand Down Expand Up @@ -53,7 +55,8 @@ public static boolean processRecord(
String shardId,
String sourceDbTimezoneOffset,
IDMLGenerator dmlGenerator,
ISpannerMigrationTransformer spannerToSourceTransformer)
ISpannerMigrationTransformer spannerToSourceTransformer,
String source)
throws Exception {

try {
Expand Down Expand Up @@ -102,7 +105,11 @@ public static boolean processRecord(
LOG.warn("DML statement is empty for table: " + tableName);
return false;
}
dao.write(dmlGeneratorResponse.getDmlStatement());
if (source.equals(SOURCE_CASSANDRA)) {
dao.write(dmlGeneratorResponse);
} else {
dao.write(dmlGeneratorResponse.getDmlStatement());
}

Counter numRecProcessedMetric =
Metrics.counter(shardId, "records_written_to_source_" + shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.CassandraDao;
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.IDao;
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.JdbcDao;
import com.google.cloud.teleport.v2.templates.dbutils.dml.CassandraDMLGenerator;
import com.google.cloud.teleport.v2.templates.dbutils.dml.IDMLGenerator;
import com.google.cloud.teleport.v2.templates.dbutils.dml.MySQLDMLGenerator;
import com.google.cloud.teleport.v2.templates.exceptions.UnsupportedSourceException;
import com.google.cloud.teleport.v2.templates.models.ConnectionHelperRequest;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorRequest;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -54,16 +53,7 @@ public class SourceProcessorFactory {

static {
dmlGeneratorMap.put(Constants.SOURCE_MYSQL, new MySQLDMLGenerator());
dmlGeneratorMap.put(
Constants.SOURCE_CASSANDRA,
new IDMLGenerator() {
// TODO It will get removed in DML PR added Now for Test case eg: new
// CassandraDMLGenerator()
@Override
public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequest) {
return new DMLGeneratorResponse("");
}
});
dmlGeneratorMap.put(Constants.SOURCE_CASSANDRA, new CassandraDMLGenerator());

connectionHelperMap.put(Constants.SOURCE_MYSQL, new JdbcConnectionHelper());
connectionHelperMap.put(Constants.SOURCE_CASSANDRA, new CassandraConnectionHelper());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public void processElement(ProcessContext c) throws Exception {
String qualifiedShard = "";
String tableName = record.getTableName();
String keysJsonStr = record.getMod().getKeysJson();
long finalKey;

try {
if (shardingMode.equals(Constants.SHARDING_MODE_SINGLE_SHARD)) {
Expand Down Expand Up @@ -231,9 +232,7 @@ public void processElement(ProcessContext c) throws Exception {

record.setShard(qualifiedShard);
String finalKeyString = tableName + "_" + keysJsonStr + "_" + qualifiedShard;
Long finalKey =
finalKeyString.hashCode() % maxConnectionsAcrossAllShards; // The total parallelism is
// maxConnectionsAcrossAllShards
finalKey = finalKeyString.hashCode() % maxConnectionsAcrossAllShards;
c.output(KV.of(finalKey, record));

} catch (Exception e) {
Expand All @@ -242,7 +241,7 @@ public void processElement(ProcessContext c) throws Exception {
LOG.error("Error fetching shard Id column: " + e.getMessage() + ": " + errors.toString());
// The record has no shard hence will be sent to DLQ in subsequent steps
String finalKeyString = record.getTableName() + "_" + keysJsonStr + "_" + skipDirName;
Long finalKey = finalKeyString.hashCode() % maxConnectionsAcrossAllShards;
finalKey = finalKeyString.hashCode() % maxConnectionsAcrossAllShards;
c.output(KV.of(finalKey, record));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ public void processElement(ProcessContext c) {
shardId,
sourceDbTimezoneOffset,
sourceProcessor.getDmlGenerator(),
spannerToSourceTransformer);
spannerToSourceTransformer,
this.source);
if (isEventFiltered) {
outputWithTag(c, Constants.FILTERED_TAG, Constants.FILTERED_TAG_MESSAGE, spannerRec);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.dbutils.dml;

import static org.junit.Assert.assertTrue;

import com.google.cloud.teleport.v2.spanner.migrations.schema.ColumnPK;
import com.google.cloud.teleport.v2.spanner.migrations.schema.NameAndCols;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceTable;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SpannerColumnDefinition;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SpannerColumnType;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SpannerTable;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SyntheticPKey;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorRequest;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import java.util.HashMap;
import java.util.Map;
import org.json.JSONObject;
import org.junit.Test;

public class CassandraDMLGeneratorTest {

@Test
public void primaryKeyNotFoundInJson() {
Schema schema =
SessionFileReader.read("src/test/resources/CassandraJson/cassandraAllDatatypeSession.json");
String tableName = "Singers";
String newValuesString = "{\"FirstName\":\"kk\",\"LastName\":\"ll\"}";
JSONObject newValuesJson = new JSONObject(newValuesString);
String keyValueString = "{\"SomeRandomName\":\"999\"}";
JSONObject keyValuesJson = new JSONObject(keyValueString);
String modType = "INSERT";

CassandraDMLGenerator cassandraDMLGenerator = new CassandraDMLGenerator();
DMLGeneratorResponse dmlGeneratorResponse =
cassandraDMLGenerator.getDMLStatement(
new DMLGeneratorRequest.Builder(
modType, tableName, newValuesJson, keyValuesJson, "+00:00")
.setSchema(schema)
.build());
String sql = dmlGeneratorResponse.getDmlStatement();

assertTrue(sql.isEmpty());
}

@Test
public void primaryKeyNotPresentInSourceSchema() {
Schema schema =
SessionFileReader.read("src/test/resources/CassandraJson/cassandraAllDatatypeSession.json");
String tableName = "Singers";
String newValuesString = "{\"FirstName\":\"kk\",\"LastName\":\"ll\"}";
JSONObject newValuesJson = new JSONObject(newValuesString);
String keyValueString = "{\"SingerId\":\"999\"}";
JSONObject keyValuesJson = new JSONObject(keyValueString);
String modType = "INSERT";

CassandraDMLGenerator cassandraDMLGenerator = new CassandraDMLGenerator();
DMLGeneratorResponse dmlGeneratorResponse =
cassandraDMLGenerator.getDMLStatement(
new DMLGeneratorRequest.Builder(
modType, tableName, newValuesJson, keyValuesJson, "+00:00")
.setSchema(schema)
.build());
String sql = dmlGeneratorResponse.getDmlStatement();

assertTrue(sql.isEmpty());
}

@Test
public void testSpannerTableNotInSchema() {
Schema schema =
SessionFileReader.read("src/test/resources/CassandraJson/cassandraAllDatatypeSession.json");
String tableName = "SomeRandomTableNotInSchema";
String newValuesString = "{\"FirstName\":\"kk\",\"LastName\":\"ll\"}";
JSONObject newValuesJson = new JSONObject(newValuesString);
String keyValueString = "{\"SingerId\":\"999\"}";
JSONObject keyValuesJson = new JSONObject(keyValueString);
String modType = "INSERT";

CassandraDMLGenerator cassandraDMLGenerator = new CassandraDMLGenerator();
DMLGeneratorResponse dmlGeneratorResponse =
cassandraDMLGenerator.getDMLStatement(
new DMLGeneratorRequest.Builder(
modType, tableName, newValuesJson, keyValuesJson, "+00:00")
.setSchema(schema)
.build());
String sql = dmlGeneratorResponse.getDmlStatement();

assertTrue(sql.isEmpty());
}

public static Schema getSchemaObject() {
Map<String, SyntheticPKey> syntheticPKeys = new HashMap<String, SyntheticPKey>();
Map<String, SourceTable> srcSchema = new HashMap<String, SourceTable>();
Map<String, SpannerTable> spSchema = getSampleSpSchema();
Map<String, NameAndCols> spannerToID = getSampleSpannerToId();
Schema expectedSchema = new Schema(spSchema, syntheticPKeys, srcSchema);
expectedSchema.setSpannerToID(spannerToID);
return expectedSchema;
}

public static Map<String, SpannerTable> getSampleSpSchema() {
Map<String, SpannerTable> spSchema = new HashMap<String, SpannerTable>();
Map<String, SpannerColumnDefinition> t1SpColDefs =
new HashMap<String, SpannerColumnDefinition>();
t1SpColDefs.put(
"c1", new SpannerColumnDefinition("accountId", new SpannerColumnType("STRING", false)));
t1SpColDefs.put(
"c2", new SpannerColumnDefinition("accountName", new SpannerColumnType("STRING", false)));
t1SpColDefs.put(
"c3",
new SpannerColumnDefinition("migration_shard_id", new SpannerColumnType("STRING", false)));
t1SpColDefs.put(
"c4", new SpannerColumnDefinition("accountNumber", new SpannerColumnType("INT", false)));
spSchema.put(
"t1",
new SpannerTable(
"tableName",
new String[] {"c1", "c2", "c3", "c4"},
t1SpColDefs,
new ColumnPK[] {new ColumnPK("c1", 1)},
"c3"));
return spSchema;
}

public static Map<String, NameAndCols> getSampleSpannerToId() {
Map<String, NameAndCols> spannerToId = new HashMap<String, NameAndCols>();
Map<String, String> t1ColIds = new HashMap<String, String>();
t1ColIds.put("accountId", "c1");
t1ColIds.put("accountName", "c2");
t1ColIds.put("migration_shard_id", "c3");
t1ColIds.put("accountNumber", "c4");
spannerToId.put("tableName", new NameAndCols("t1", t1ColIds));
return spannerToId;
}
}
Loading
Loading