Skip to content

Commit

Permalink
Merge pull request #38 from datastax/feature/frametoolongexception
Browse files Browse the repository at this point in the history
Added capability to customize page-size (fixes FrameTooLongException)
  • Loading branch information
pravinbhat authored Dec 1, 2022
2 parents 869b8fc + 02e6061 commit baaf92c
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 33 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<groupId>datastax.astra.migrate</groupId>
<artifactId>cassandra-data-migrator</artifactId>
<version>2.7</version>
<version>2.8</version>
<packaging>jar</packaging>

<properties>
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/datastax/astra/migrate/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Map;
import java.util.stream.IntStream;

Expand All @@ -26,6 +27,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
this.astraSession = astraSession;

batchSize = new Integer(Util.getSparkPropOr(sc, "spark.batchSize", "1"));
fetchSizeInRows = new Integer(Util.getSparkPropOr(sc, "spark.read.fetch.sizeInRows", "1000"));
printStatsAfter = new Integer(Util.getSparkPropOr(sc, "spark.printStatsAfter", "100000"));
if (printStatsAfter < 1) {
printStatsAfter = 100000;
Expand Down Expand Up @@ -77,6 +79,7 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
}

logger.info("PARAM -- Write Batch Size: {}", batchSize);
logger.info("PARAM -- Read Fetch Size: {}", fetchSizeInRows);
logger.info("PARAM -- Source Keyspace Table: {}", sourceKeyspaceTable);
logger.info("PARAM -- Destination Keyspace Table: {}", astraKeyspaceTable);
logger.info("PARAM -- ReadRateLimit: {}", readLimiter.getRate());
Expand Down Expand Up @@ -207,7 +210,8 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
}
}

return boundInsertStatement;
// Batch insert for large records may take longer, hence 10 secs to avoid timeout errors
return boundInsertStatement.setTimeout(Duration.ofSeconds(10));
}

public int getLargestTTL(Row sourceRow) {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/datastax/astra/migrate/BaseJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public abstract class BaseJobSession {
protected List<Integer> updateSelectMapping = new ArrayList<Integer>();

protected Integer batchSize = 1;
protected Integer fetchSizeInRows = 1000;
protected Integer printStatsAfter = 100000;

protected Boolean writeTimeStampFilter = Boolean.FALSE;
Expand Down Expand Up @@ -67,6 +68,7 @@ public String getKey(Row sourceRow) {

return key.toString();
}

public List<MigrateDataType> getTypes(String types) {
List<MigrateDataType> dataTypes = new ArrayList<MigrateDataType>();
for (String type : types.split(",")) {
Expand Down
14 changes: 8 additions & 6 deletions src/main/java/datastax/astra/migrate/CopyJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) {

try {
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact()));
ResultSet resultSet = sourceSession.execute(sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(),
hasRandomPartitioner ? max : max.longValueExact()).setPageSize(fetchSizeInRows));
Collection<CompletionStage<AsyncResultSet>> writeResults = new ArrayList<CompletionStage<AsyncResultSet>>();

// cannot do batching if the writeFilter is greater than 0 or
Expand Down Expand Up @@ -87,15 +88,16 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
astraRow = astraReadResultSet.one();
}


CompletionStage<AsyncResultSet> astraWriteResultSet = astraSession
.executeAsync(bindInsert(astraInsertStatement, sourceRow, astraRow));
writeResults.add(astraWriteResultSet);
if (writeResults.size() > 1000) {
if (writeResults.size() > fetchSizeInRows) {
iterateAndClearWriteResults(writeResults, 1);
}
}

// clear the write resultset in-case it didnt mod at 1000 above
// clear the write resultset
iterateAndClearWriteResults(writeResults, 1);
} else {
BatchStatement batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
Expand Down Expand Up @@ -124,12 +126,12 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
batchStatement = BatchStatement.newInstance(BatchType.UNLOGGED);
}

if (writeResults.size() * batchSize > 1000) {
if (writeResults.size() * batchSize > fetchSizeInRows) {
iterateAndClearWriteResults(writeResults, batchSize);
}
}

// clear the write resultset in-case it didnt mod at 1000 above
// clear the write resultset
iterateAndClearWriteResults(writeResults, batchSize);

// if there are any pending writes because the batchSize threshold was not met, then write and clear them
Expand Down Expand Up @@ -173,4 +175,4 @@ private void iterateAndClearWriteResults(Collection<CompletionStage<AsyncResultS
writeResults.clear();
}

}
}
5 changes: 3 additions & 2 deletions src/main/java/datastax/astra/migrate/DiffJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
try {
// cannot do batching if the writeFilter is greater than 0
ResultSet resultSet = sourceSession.execute(
sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact()).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));
sourceSelectStatement.bind(hasRandomPartitioner ? min : min.longValueExact(), hasRandomPartitioner ? max : max.longValueExact())
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM).setPageSize(fetchSizeInRows));

Map<Row, CompletionStage<AsyncResultSet>> srcToTargetRowMap = new HashMap<Row, CompletionStage<AsyncResultSet>>();
StreamSupport.stream(resultSet.spliterator(), false).forEach(srcRow -> {
Expand All @@ -77,7 +78,7 @@ public void getDataAndDiff(BigInteger min, BigInteger max) {
CompletionStage<AsyncResultSet> targetRowFuture = astraSession
.executeAsync(selectFromAstra(astraSelectStatement, srcRow));
srcToTargetRowMap.put(srcRow, targetRowFuture);
if (srcToTargetRowMap.size() > 1000) {
if (srcToTargetRowMap.size() > fetchSizeInRows) {
diffAndClear(srcToTargetRowMap);
}
} else {
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/datastax/astra/migrate/AbstractJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ class AbstractJob extends BaseJob {
abstractLogger.info("PARAM -- Split Size: " + splitSize)
abstractLogger.info("PARAM -- Coverage Percent: " + coveragePercent)

var sourceConnection = getConnection(true, sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword, sourceReadConsistencyLevel,
var sourceConnection = getConnection(true, sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword,
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);

var destinationConnection = getConnection(false, destinationIsAstra, destinationScbPath, destinationHost, destinationUsername, destinationPassword, destinationReadConsistencyLevel,
var destinationConnection = getConnection(false, destinationIsAstra, destinationScbPath, destinationHost, destinationUsername, destinationPassword,
destinationTrustStorePath, destinationTrustStorePassword, destinationTrustStoreType, destinationKeyStorePath, destinationKeyStorePassword, destinationEnabledAlgorithms);

private def getConnection(isSource: Boolean, isAstra: String, scbPath: String, host: String, username: String, password: String, readConsistencyLevel: String,
private def getConnection(isSource: Boolean, isAstra: String, scbPath: String, host: String, username: String, password: String,
trustStorePath: String, trustStorePassword: String, trustStoreType: String,
keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String): CassandraConnector = {
var connType: String = "Source"
Expand All @@ -31,7 +31,7 @@ class AbstractJob extends BaseJob {
return CassandraConnector(config
.set("spark.cassandra.auth.username", username)
.set("spark.cassandra.auth.password", password)
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
.set("spark.cassandra.input.consistency.level", consistencyLevel)
.set("spark.cassandra.connection.config.cloud.path", scbPath))
} else if (null != trustStorePath && !trustStorePath.trim.isEmpty) {
abstractLogger.info(connType + ": Connecting to Cassandra (or DSE) with SSL host: " + host);
Expand All @@ -45,7 +45,7 @@ class AbstractJob extends BaseJob {
return CassandraConnector(config
.set("spark.cassandra.auth.username", username)
.set("spark.cassandra.auth.password", password)
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
.set("spark.cassandra.input.consistency.level", consistencyLevel)
.set("spark.cassandra.connection.host", host)
.set("spark.cassandra.connection.ssl.enabled", "true")
.set("spark.cassandra.connection.ssl.enabledAlgorithms", enabledAlgorithmsVar)
Expand All @@ -61,7 +61,7 @@ class AbstractJob extends BaseJob {

return CassandraConnector(config.set("spark.cassandra.auth.username", username)
.set("spark.cassandra.auth.password", password)
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
.set("spark.cassandra.input.consistency.level", consistencyLevel)
.set("spark.cassandra.connection.host", host))
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/datastax/astra/migrate/BaseJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ class BaseJob extends App {
val sContext = spark.sparkContext
val sc = sContext.getConf

val consistencyLevel = Util.getSparkPropOr(sc, "spark.read.consistency.level", "LOCAL_QUORUM")

val sourceIsAstra = Util.getSparkPropOr(sc, "spark.origin.isAstra", "false")
val sourceScbPath = Util.getSparkPropOrEmpty(sc, "spark.origin.scb")
val sourceHost = Util.getSparkPropOrEmpty(sc, "spark.origin.host")
val sourceUsername = Util.getSparkPropOrEmpty(sc, "spark.origin.username")
val sourcePassword = Util.getSparkPropOrEmpty(sc, "spark.origin.password")
val sourceReadConsistencyLevel = Util.getSparkPropOr(sc, "spark.origin.read.consistency.level", "LOCAL_QUORUM")
val sourceTrustStorePath = Util.getSparkPropOrEmpty(sc, "spark.origin.trustStore.path")
val sourceTrustStorePassword = Util.getSparkPropOrEmpty(sc, "spark.origin.trustStore.password")
val sourceTrustStoreType = Util.getSparkPropOr(sc, "spark.origin.trustStore.type", "JKS")
Expand All @@ -36,7 +37,6 @@ class BaseJob extends App {
val destinationHost = Util.getSparkPropOrEmpty(sc, "spark.target.host")
val destinationUsername = Util.getSparkProp(sc, "spark.target.username")
val destinationPassword = Util.getSparkProp(sc, "spark.target.password")
val destinationReadConsistencyLevel = Util.getSparkPropOr(sc, "spark.target.read.consistency.level", "LOCAL_QUORUM")
val destinationTrustStorePath = Util.getSparkPropOrEmpty(sc, "spark.target.trustStore.path")
val destinationTrustStorePassword = Util.getSparkPropOrEmpty(sc, "spark.target.trustStore.password")
val destinationTrustStoreType = Util.getSparkPropOr(sc, "spark.target.trustStore.type", "JKS")
Expand All @@ -48,7 +48,7 @@ class BaseJob extends App {
val maxPartition = new BigInteger(Util.getSparkPropOr(sc, "spark.origin.maxPartition", "9223372036854775807"))
val coveragePercent = Util.getSparkPropOr(sc, "spark.coveragePercent", "100")
val splitSize = Integer.parseInt(Util.getSparkPropOr(sc, "spark.splitSize", "10000"))

protected def exitSpark() = {
spark.stop()
abstractLogger.info("################################################################################################")
Expand Down
13 changes: 5 additions & 8 deletions src/main/scala/datastax/astra/migrate/OriginData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ object OriginData extends BaseJob {

val logger = LoggerFactory.getLogger(this.getClass.getName)
logger.info("Started Migration App")
var sourceConnection = getConnection(true, sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword, sourceReadConsistencyLevel,
var sourceConnection = getConnection(true, sourceIsAstra, sourceScbPath, sourceHost, sourceUsername, sourcePassword,
sourceTrustStorePath, sourceTrustStorePassword, sourceTrustStoreType, sourceKeyStorePath, sourceKeyStorePassword, sourceEnabledAlgorithms);
analyzeSourceTable(sourceConnection)
exitSpark


private def getConnection(isSource: Boolean, isAstra: String, scbPath: String, host: String, username: String, password: String, readConsistencyLevel: String,
private def getConnection(isSource: Boolean, isAstra: String, scbPath: String, host: String, username: String, password: String,
trustStorePath: String, trustStorePassword: String, trustStoreType: String,
keyStorePath: String, keyStorePassword: String, enabledAlgorithms: String): CassandraConnector = {
var connType: String = "Source"
Expand All @@ -26,7 +26,7 @@ object OriginData extends BaseJob {
return CassandraConnector(sc
.set("spark.cassandra.auth.username", username)
.set("spark.cassandra.auth.password", password)
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
.set("spark.cassandra.input.consistency.level", consistencyLevel)
.set("spark.cassandra.connection.config.cloud.path", scbPath))
} else if (null != trustStorePath && !trustStorePath.trim.isEmpty) {
abstractLogger.info(connType + ": Connected to Cassandra (or DSE) with SSL!");
Expand All @@ -40,7 +40,7 @@ object OriginData extends BaseJob {
return CassandraConnector(sc
.set("spark.cassandra.auth.username", username)
.set("spark.cassandra.auth.password", password)
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
.set("spark.cassandra.input.consistency.level", consistencyLevel)
.set("spark.cassandra.connection.host", host)
.set("spark.cassandra.connection.ssl.enabled", "true")
.set("spark.cassandra.connection.ssl.enabledAlgorithms", enabledAlgorithmsVar)
Expand All @@ -56,7 +56,7 @@ object OriginData extends BaseJob {

return CassandraConnector(sc.set("spark.cassandra.auth.username", username)
.set("spark.cassandra.auth.password", password)
.set("spark.cassandra.input.consistency.level", readConsistencyLevel)
.set("spark.cassandra.input.consistency.level", consistencyLevel)
.set("spark.cassandra.connection.host", host))
}

Expand All @@ -78,6 +78,3 @@ object OriginData extends BaseJob {

}




14 changes: 8 additions & 6 deletions src/resources/sparkConf.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,21 @@ spark.origin.isAstra false
spark.origin.host localhost
spark.origin.username some-username
spark.origin.password some-secret-password
spark.origin.read.consistency.level LOCAL_QUORUM
spark.origin.keyspaceTable test.a1

spark.target.isAstra true
spark.target.scb file:///aaa/bbb/secure-connect-enterprise.zip
spark.target.username client-id
spark.target.password client-secret
spark.target.read.consistency.level LOCAL_QUORUM
spark.target.keyspaceTable test.a2
spark.target.autocorrect.missing false
spark.target.autocorrect.mismatch false
spark.target.custom.writeTime 0

spark.maxRetries 10
spark.readRateLimit 20000
spark.writeRateLimit 20000
spark.splitSize 10000
spark.batchSize 5
spark.coveragePercent 100
spark.printStatsAfter 100000
spark.fieldGuardraillimitMB 10

spark.query.origin partition-key,clustering-key,order-date,amount
spark.query.origin.partitionKey partition-key
Expand All @@ -40,6 +34,14 @@ spark.origin.writeTimeStampFilter false
spark.origin.minWriteTimeStampFilter 0
spark.origin.maxWriteTimeStampFilter 9223372036854775807

########################## ONLY CHANGE IF YOU KNOW WHAT YOU ARE DOING ###############################
#spark.coveragePercent 100
#spark.printStatsAfter 100000
#spark.read.consistency.level LOCAL_QUORUM
#spark.read.fetch.sizeInRows 1000
#spark.target.custom.writeTime 0
#spark.fieldGuardraillimitMB 10

################### ONLY USE if needing to get record count of recs greater than 10MB from Origin ######################
#spark.origin.checkTableforColSize false
#spark.origin.checkTableforColSize.cols partition-key,clustering-key
Expand Down

0 comments on commit baaf92c

Please sign in to comment.