Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra pr bug fixes #64

Merged
merged 5 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,19 @@ public CassandraDao(

@Override
public void write(DMLGeneratorResponse dmlGeneratorResponse) throws Exception {
try (CqlSession session =
(CqlSession)
connectionHelper.getConnection(this.cassandraUrl)) { // Ensure connection is obtained
if (session == null) {
throw new ConnectionException("Connection is null");
}
PreparedStatementGeneratedResponse preparedStatementGeneratedResponse =
(PreparedStatementGeneratedResponse) dmlGeneratorResponse;
String dmlStatement = preparedStatementGeneratedResponse.getDmlStatement();
PreparedStatement preparedStatement = session.prepare(dmlStatement);
BoundStatement boundStatement =
preparedStatement.bind(
preparedStatementGeneratedResponse.getValues().stream()
.map(v -> CassandraTypeHandler.castToExpectedType(v.dataType(), v.value()))
.toArray());
session.execute(boundStatement);
CqlSession session = (CqlSession) connectionHelper.getConnection(this.cassandraUrl);
if (session == null) {
throw new ConnectionException("Connection is null");
}
PreparedStatementGeneratedResponse preparedStatementGeneratedResponse =
(PreparedStatementGeneratedResponse) dmlGeneratorResponse;
String dmlStatement = preparedStatementGeneratedResponse.getDmlStatement();
PreparedStatement preparedStatement = session.prepare(dmlStatement);
BoundStatement boundStatement =
preparedStatement.bind(
preparedStatementGeneratedResponse.getValues().stream()
.map(v -> CassandraTypeHandler.castToExpectedType(v.dataType(), v.value()))
.toArray());
session.execute(boundStatement);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,7 @@ private static DMLGeneratorResponse getDeleteStatementCQL(
deleteConditions.setLength(deleteConditions.length() - 5);
}

String preparedStatement =
"DELETE FROM " + tableName + " WHERE " + deleteConditions + " USING TIMESTAMP ?;";

PreparedStatementValueObject<Long> timestampObj =
PreparedStatementValueObject.create("USING_TIMESTAMP", timestamp);
values.add(timestampObj);
String preparedStatement = "DELETE FROM " + tableName + " WHERE " + deleteConditions + ";";

return new PreparedStatementGeneratedResponse(preparedStatement, values);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.BooleanUtils;
import org.eclipse.jetty.util.StringUtil;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -242,11 +242,11 @@ private static Boolean handleCassandraBoolType(String colName, JSONObject values
* @return a {@link Float} object containing the value represented in cassandra type.
*/
private static Float handleCassandraFloatType(String colName, JSONObject valuesJson) {
try {
return valuesJson.getBigDecimal(colName).floatValue();
} catch (JSONException e) {
BigDecimal colValue = valuesJson.optBigDecimal(colName, null);
if (colValue == null) {
return null;
}
return colValue.floatValue();
}

/**
Expand All @@ -257,11 +257,11 @@ private static Float handleCassandraFloatType(String colName, JSONObject valuesJ
* @return a {@link Double} object containing the value represented in cassandra type.
*/
private static Double handleCassandraDoubleType(String colName, JSONObject valuesJson) {
try {
return valuesJson.getBigDecimal(colName).doubleValue();
} catch (JSONException e) {
BigDecimal colValue = valuesJson.optBigDecimal(colName, null);
if (colValue == null) {
return null;
}
return colValue.doubleValue();
}

/**
Expand Down Expand Up @@ -399,11 +399,11 @@ private static UUID handleCassandraUuidType(String colName, JSONObject valuesJso
* @return a {@link Long} object containing Long as value represented in cassandra type.
*/
private static Long handleCassandraBigintType(String colName, JSONObject valuesJson) {
try {
return valuesJson.getBigInteger(colName).longValue();
} catch (JSONException e) {
BigInteger colValue = valuesJson.optBigInteger(colName, null);
if (colValue == null) {
return null;
}
return colValue.longValue();
}

/**
Expand All @@ -414,11 +414,11 @@ private static Long handleCassandraBigintType(String colName, JSONObject valuesJ
* @return a {@link Integer} object containing Integer as value represented in cassandra type.
*/
private static Integer handleCassandraIntType(String colName, JSONObject valuesJson) {
try {
return valuesJson.getBigInteger(colName).intValue();
} catch (JSONException e) {
BigInteger colValue = valuesJson.optBigInteger(colName, null);
if (colValue == null) {
return null;
}
return colValue.intValue();
}

/**
Expand Down Expand Up @@ -458,8 +458,7 @@ private static Instant convertToCassandraTimestamp(String timestampValue) {
return LocalDate.from(temporal).atStartOfDay(ZoneOffset.UTC).toInstant();
}
} catch (DateTimeParseException ignored) {
throw new IllegalArgumentException(
"Failed to parse timestamp value" + timestampValue, ignored);
LOG.info("Exception found from different formatter " + ignored.getMessage());
}
}
throw new IllegalArgumentException("Failed to parse timestamp value: " + timestampValue);
Expand Down Expand Up @@ -611,9 +610,9 @@ private static Object handleSpannerColumnType(
return CassandraTypeHandler.handleCassandraTimestampType(columnName, valuesJson);
} else if ("boolean".equals(spannerType)) {
return CassandraTypeHandler.handleCassandraBoolType(columnName, valuesJson);
} else if (spannerType.matches("numeric|float")) {
} else if (spannerType.matches("float")) {
return CassandraTypeHandler.handleCassandraFloatType(columnName, valuesJson);
} else if (spannerType.contains("float")) {
} else if (spannerType.contains("float") || spannerType.contains("numeric")) {
return CassandraTypeHandler.handleCassandraDoubleType(columnName, valuesJson);
} else if (spannerType.contains("bytes") || spannerType.contains("blob")) {
return CassandraTypeHandler.handleCassandraBlobType(columnName, valuesJson);
Expand Down Expand Up @@ -647,7 +646,8 @@ private static Object handleStringType(String colName, JSONObject valuesJson) {
return new JSONArray(inputValue);
} else if (isValidJSONObject(inputValue)) {
return new JSONObject(inputValue);
} else if (StringUtil.isHex(inputValue, 0, inputValue.length())) {
} else if (StringUtil.isHex(inputValue, 0, inputValue.length())
&& inputValue.startsWith("0x")) {
return CassandraTypeHandler.handleCassandraBlobType(colName, valuesJson);
} else if (isAscii(inputValue)) {
return CassandraTypeHandler.handleCassandraAsciiType(colName, valuesJson);
Expand Down Expand Up @@ -786,7 +786,10 @@ private static Object parseNumericType(String columnType, Object colValue) {
*/
private static Boolean parseBoolean(Object colValue) {
if (colValue instanceof String) {
return Boolean.parseBoolean((String) colValue);
if (Arrays.asList("0", "1").contains((String) colValue)) {
return colValue.equals("1");
}
return BooleanUtils.toBoolean((String) colValue);
}
return (Boolean) colValue;
}
Expand All @@ -804,8 +807,10 @@ private static Boolean parseBoolean(Object colValue) {
private static BigDecimal parseDecimal(Object colValue) {
if (colValue instanceof String) {
return new BigDecimal((String) colValue);
} else if (colValue instanceof Number) {
return BigDecimal.valueOf(((Number) colValue).doubleValue());
} else if (colValue instanceof Float) {
return BigDecimal.valueOf((Float) colValue);
} else if (colValue instanceof Double) {
return BigDecimal.valueOf((Double) colValue);
}
return (BigDecimal) colValue;
}
Expand Down Expand Up @@ -992,7 +997,8 @@ public static Object castToExpectedType(String cassandraType, Object columnValue
return parseAndCastToCassandraType(cassandraType, columnValue).value();
} catch (ClassCastException | IllegalArgumentException e) {
LOG.error("Error converting value for column: {}, type: {}", cassandraType, e.getMessage());
throw e;
throw new IllegalArgumentException(
"Error converting value for cassandraType: " + cassandraType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ public void processElement(ProcessContext c) throws Exception {

record.setShard(qualifiedShard);
String finalKeyString = tableName + "_" + keysJsonStr + "_" + qualifiedShard;
Long finalKey = finalKeyString.hashCode() % maxConnectionsAcrossAllShards;
Long finalKey =
finalKeyString.hashCode() % maxConnectionsAcrossAllShards; // The total parallelism is
// maxConnectionsAcrossAllShards
c.output(KV.of(finalKey, record));

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public void deleteMultiplePKColumns() {
.setSchema(schema)
.build());
String sql = dmlGeneratorResponse.getDmlStatement();
assertEquals(2, ((PreparedStatementGeneratedResponse) dmlGeneratorResponse).getValues().size());
assertEquals(1, ((PreparedStatementGeneratedResponse) dmlGeneratorResponse).getValues().size());
}

@Test
Expand Down
Loading
Loading