Skip to content

Commit

Permalink
Merge pull request #48 from GoogleCloudPlatform/main
Browse files Browse the repository at this point in the history
Metadata config and pipeline options (GoogleCloudPlatform#2081)
  • Loading branch information
taherkl authored Jan 7, 2025
2 parents ae3037a + 23be7bc commit ef5ae8d
Show file tree
Hide file tree
Showing 23 changed files with 1,793 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public class Constants {
/* The source type value for MySql databases */
public static final String MYSQL_SOURCE_TYPE = "mysql";

/* The source type value for CASSANDRA databases */
public static final String CASSANDRA_SOURCE_TYPE = "cassandra";

/* The value for Oracle databases in the source type key */
public static final String ORACLE_SOURCE_TYPE = "oracle";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.migrations.metadata;

import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.google.cloud.teleport.v2.spanner.migrations.schema.ColumnPK;
import com.google.cloud.teleport.v2.spanner.migrations.schema.NameAndCols;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnDefinition;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnType;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceTable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* The {@code CassandraSourceMetadata} class is responsible for generating metadata from a Cassandra
* schema using a {@link ResultSet}, converting it into a Spanner-compatible format, and managing
* this metadata to facilitate schema migration operations.
*
* <p>This class supports the following functionalities:
*
* <ul>
* <li>Extracting table, column, and primary key information from Cassandra's schema.
* <li>Converting Cassandra schema details into Spanner-compatible objects like {@link
* SourceTable}.
* <li>Updating a provided {@link Schema} instance with the extracted metadata.
* </ul>
*
* <p>The metadata extraction process uses the {@link ResultSet} containing the schema details from
* Cassandra, such as table names, column definitions, data types, and primary key details.
*
* <p><strong>Note:</strong> This class does not perform direct database interactions; it relies on
* a pre-populated {@link ResultSet}.
*/
public class CassandraSourceMetadata {

private Map<String, SourceTable> sourceTableMap;
private Map<String, NameAndCols> nameAndColsMap;

private final ResultSet resultSet;

/**
* Retrieves the map of source tables.
*
* @return A {@link Map} where the key is a {@link String} representing the table name, and the
* value is a {@link SourceTable} containing the source table details.
*/
public Map<String, SourceTable> getSourceTableMap() {
return sourceTableMap;
}

/**
* Sets the map of source tables.
*
* @param sourceTableMap A {@link Map} where the key is a {@link String} representing the table
* name, and the value is a {@link SourceTable} containing the source table details.
*/
public void setSourceTableMap(Map<String, SourceTable> sourceTableMap) {
this.sourceTableMap = sourceTableMap;
}

/**
* Retrieves the map of names and columns.
*
* @return A {@link Map} where the key is a {@link String} representing the table name, and the
* value is a {@link NameAndCols} containing the name and columns metadata.
*/
public Map<String, NameAndCols> getNameAndColsMap() {
return nameAndColsMap;
}

/**
* Sets the map of names and columns.
*
* @param nameAndColsMap A {@link Map} where the key is a {@link String} representing the table
* name, and the value is a {@link NameAndCols} containing the name and columns metadata.
*/
public void setNameAndColsMap(Map<String, NameAndCols> nameAndColsMap) {
this.nameAndColsMap = nameAndColsMap;
}

/**
* Private constructor to initialize {@link CassandraSourceMetadata}.
*
* @param resultSet The {@link ResultSet} containing Cassandra schema metadata. Cannot be null.
*/
private CassandraSourceMetadata(ResultSet resultSet) {
this.resultSet = Objects.requireNonNull(resultSet, "ResultSet cannot be null");
}

/**
* Generates a map of table names to {@link SourceTable} objects, representing the schema of the
* Cassandra source in a Spanner-compatible format.
*
* @return A map where keys are table names and values are {@link SourceTable} objects containing
* schema details.
*/
private Map<String, SourceTable> generateSourceSchema() {
Map<String, Map<String, SourceColumnDefinition>> colDefinitions = new HashMap<>();
Map<String, List<ColumnPK>> columnPKs = new HashMap<>();
Map<String, List<String>> columnIds = new HashMap<>();
Set<String> tableNames = new HashSet<>();

resultSet.forEach(
row -> {
String tableName = row.getString("table_name");
String columnName = row.getString("column_name");
String dataType = row.getString("type");
String kind = row.getString("kind");

tableNames.add(tableName);

colDefinitions
.computeIfAbsent(tableName, k -> new HashMap<>())
.put(
columnName,
new SourceColumnDefinition(
columnName, new SourceColumnType(dataType, null, null)));

if (isPrimaryKey(kind)) {
columnPKs
.computeIfAbsent(tableName, k -> new ArrayList<>())
.add(new ColumnPK(columnName, 1));
}

columnIds.computeIfAbsent(tableName, k -> new ArrayList<>()).add(columnName);
});

return tableNames.stream()
.collect(
Collectors.toMap(
tableName -> tableName,
tableName ->
new SourceTable(
tableName,
null,
columnIds.getOrDefault(tableName, List.of()).toArray(new String[0]),
colDefinitions.getOrDefault(tableName, Map.of()),
columnPKs.getOrDefault(tableName, List.of()).toArray(new ColumnPK[0]))));
}

/**
* Updates the provided {@link Schema} with metadata generated from the Cassandra {@link
* ResultSet}.
*
* <p>This method extracts schema details, transforms them into Spanner-compatible objects, and
* sets the corresponding properties in the provided {@link Schema}.
*/
public void generateSourceSchemaMap() {
Map<String, SourceTable> sourceTableMap = generateSourceSchema();
this.setSourceTableMap(sourceTableMap);
this.setNameAndColsMap(convertSourceToNameAndColsTable(sourceTableMap.values()));
}

/**
* Determines whether a column is part of the primary key based on its kind.
*
* @param kind The column kind, such as "partition_key" or "clustering".
* @return {@code true} if the column is a primary key; {@code false} otherwise.
*/
private boolean isPrimaryKey(String kind) {
return "partition_key".equals(kind) || "clustering".equals(kind);
}

/**
* Converts a collection of {@link SourceTable} objects into a map of table names to {@link
* NameAndCols}.
*
* @param tables A collection of {@link SourceTable} objects representing the Cassandra schema.
* @return A map where keys are table names and values are {@link NameAndCols}.
*/
private Map<String, NameAndCols> convertSourceToNameAndColsTable(Collection<SourceTable> tables) {
return tables.stream()
.collect(Collectors.toMap(SourceTable::getName, this::convertSourceTableToNameAndCols));
}

/**
* Converts a single {@link SourceTable} into a {@link NameAndCols} instance.
*
* @param sourceTable The {@link SourceTable} to convert.
* @return A {@link NameAndCols} object containing the table name and column names.
*/
private NameAndCols convertSourceTableToNameAndCols(SourceTable sourceTable) {
Map<String, String> columnNames =
sourceTable.getColDefs().values().stream()
.collect(
Collectors.toMap(SourceColumnDefinition::getName, SourceColumnDefinition::getName));

return new NameAndCols(sourceTable.getName(), columnNames);
}

/**
* Builder class for creating instances of {@link CassandraSourceMetadata}.
*
* <p>The builder allows for incremental configuration of the {@link ResultSet} and {@link Schema}
* before constructing the final {@link CassandraSourceMetadata} instance.
*/
public static class Builder {
private ResultSet resultSet;

/**
* Sets the {@link ResultSet} for the builder.
*
* @param resultSet The {@link ResultSet} containing Cassandra schema information.
* @return The current {@link Builder} instance.
*/
public Builder setResultSet(ResultSet resultSet) {
this.resultSet = resultSet;
return this;
}

/**
* Builds an instance of {@link CassandraSourceMetadata}, generating and setting the schema
* metadata.
*
* @return A fully constructed {@link CassandraSourceMetadata} instance.
*/
public CassandraSourceMetadata build() {
CassandraSourceMetadata cassandraSourceMetadata = new CassandraSourceMetadata(resultSet);
cassandraSourceMetadata.generateSourceSchemaMap();
return cassandraSourceMetadata;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ public Schema() {
this.empty = true;
}

public Schema(
Map<String, SpannerTable> spSchema,
Map<String, SyntheticPKey> syntheticPKeys,
Map<String, SourceTable> srcSchema,
Map<String, NameAndCols> toSpanner,
Map<String, NameAndCols> toSource) {
this.spSchema = spSchema;
this.syntheticPKeys = syntheticPKeys;
this.srcSchema = srcSchema;
this.toSpanner = toSpanner;
this.toSource = toSource;
this.empty = (spSchema == null || srcSchema == null);
}

public Schema(
Map<String, SpannerTable> spSchema,
Map<String, SyntheticPKey> syntheticPKeys,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.migrations.shard;

import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import java.util.List;
import java.util.Objects;

public class CassandraShard extends Shard {
private final OptionsMap optionsMap;

public CassandraShard(OptionsMap optionsMap) {
super();
this.optionsMap = optionsMap;
validateFields();
extractAndSetHostAndPort();
}

private void validateFields() {
if (getContactPoints() == null || getContactPoints().isEmpty()) {
throw new IllegalArgumentException("CONTACT_POINTS cannot be null or empty.");
}
if (getKeySpaceName() == null || getKeySpaceName().isEmpty()) {
throw new IllegalArgumentException("SESSION_KEYSPACE cannot be null or empty.");
}
}

private void extractAndSetHostAndPort() {
String firstContactPoint = getContactPoints().get(0);
String[] parts = firstContactPoint.split(":");

if (parts.length < 2) {
throw new IllegalArgumentException("Invalid contact point format: " + firstContactPoint);
}

String host = parts[0];
String port = parts[1];

setHost(host);
setPort(port);
}

private String getOptionValue(TypedDriverOption<String> key) {
return optionsMap.get(key);
}

private List<String> getOptionValueList(TypedDriverOption<List<String>> key) {
return optionsMap.get(key);
}

// Getters that fetch data from OptionsMap
public List<String> getContactPoints() {
return getOptionValueList(TypedDriverOption.CONTACT_POINTS);
}

public String getKeySpaceName() {
return getOptionValue(TypedDriverOption.SESSION_KEYSPACE);
}

public String getUsername() {
return getOptionValue(TypedDriverOption.AUTH_PROVIDER_USER_NAME);
}

public OptionsMap getOptionsMap() {
return this.optionsMap;
}

@Override
public String toString() {
return String.format(
"CassandraShard{logicalShardId='%s', contactPoints=%s, keyspace='%s', host='%s', port='%s'}",
getLogicalShardId(), getContactPoints(), getKeySpaceName(), getHost(), getPort());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof CassandraShard)) {
return false;
}
CassandraShard that = (CassandraShard) o;
return Objects.equals(getContactPoints(), that.getContactPoints())
&& Objects.equals(getKeySpaceName(), that.getKeySpaceName());
}

@Override
public int hashCode() {
return Objects.hash(getContactPoints(), getKeySpaceName());
}
}
Loading

0 comments on commit ef5ae8d

Please sign in to comment.