diff --git a/scalardb-test/schema/tx_sensor.cql b/scalardb-test/schema/tx_sensor.cql index 3bd6b85..79da113 100644 --- a/scalardb-test/schema/tx_sensor.cql +++ b/scalardb-test/schema/tx_sensor.cql @@ -2,6 +2,7 @@ DROP KEYSPACE IF EXISTS sensor; CREATE KEYSPACE IF NOT EXISTS sensor WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; DROP KEYSPACE IF EXISTS coordinator; CREATE KEYSPACE IF NOT EXISTS coordinator WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; +CREATE KEYSPACE IF NOT EXISTS scalardb WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; CREATE TABLE IF NOT EXISTS sensor.tx_sensor ( timestamp int, @@ -27,3 +28,12 @@ CREATE TABLE IF NOT EXISTS coordinator.state ( tx_created_at bigint, PRIMARY KEY (tx_id) ); + +CREATE TABLE IF NOT EXISTS scalardb.namespaces ( + name text, + PRIMARY KEY (name) +); + +INSERT INTO scalardb.namespaces (name) VALUES ('sensor'); +INSERT INTO scalardb.namespaces (name) VALUES ('coordinator'); +INSERT INTO scalardb.namespaces (name) VALUES ('scalardb'); diff --git a/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorChecker.java b/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorChecker.java index 5b86686..4d033c1 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorChecker.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorChecker.java @@ -40,7 +40,7 @@ public void execute() { } if (isDuplicated) { - logError("dupilication happened !"); + logError("Duplication happened !"); throw new PostProcessException("Inconsistency happened!"); } } diff --git a/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorCommon.java b/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorCommon.java index f4b98c1..2a7e463 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorCommon.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorCommon.java @@ -55,7 +55,7 @@ public static int getMaxRevision(List results) { return maxRevision.orElse(0); } - private static int getRevisionFromResult(Result result) { + public static int getRevisionFromResult(Result result) { return result.getInt(REVISION); } } diff --git a/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorProcessor.java index ad96d92..6470bc5 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorProcessor.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorProcessor.java @@ -5,14 +5,20 @@ import com.scalar.db.api.Put; import com.scalar.db.api.Result; import com.scalar.db.api.Scan; +import com.scalar.db.api.TransactionCrudOperable; import com.scalar.db.exception.transaction.TransactionException; import com.scalar.db.exception.transaction.UnknownTransactionStatusException; import com.scalar.kelpie.config.Config; import com.scalar.kelpie.exception.ProcessFatalException; import com.scalar.kelpie.modules.TimeBasedProcessor; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.json.Json; import kelpie.scalardb.Common; @@ -21,6 +27,7 @@ public class SensorProcessor extends TimeBasedProcessor { private final int numDevices; private final AtomicBoolean isVerification; private final int startTimestamp; + private final AtomicInteger numAttempts = new AtomicInteger(); public SensorProcessor(Config config) { super(config); @@ -67,13 +74,43 @@ public void close() { private void updateRevision(DistributedTransaction transaction, int timestamp, int deviceId) throws TransactionException { - Scan scan = SensorCommon.prepareScan(timestamp); - List results = transaction.scan(scan); - boolean hasDuplicatedRevision = SensorCommon.hasDuplicatedRevision(results); + boolean hasDuplicatedRevision; + List results; + + // Alternate between scan() and getScanner() based on the attempt count. + boolean scannerUsed = numAttempts.getAndIncrement() % 2 == 0; + if (!scannerUsed) { + // Use scan() + results = transaction.scan(scan); + hasDuplicatedRevision = SensorCommon.hasDuplicatedRevision(results); + } else { + // Use getScanner() + hasDuplicatedRevision = false; + results = new ArrayList<>(); + try (TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan)) { + Set tempSet = new HashSet<>(); + while (true) { + Optional result = scanner.one(); + if (!result.isPresent()) { + break; + } + + int revision = SensorCommon.getRevisionFromResult(result.get()); + if (!tempSet.add(revision)) { + hasDuplicatedRevision = true; + break; + } + + results.add(result.get()); + } + } + } + if (hasDuplicatedRevision) { - throw new ProcessFatalException("A revision is duplicated at " + timestamp); + throw new ProcessFatalException( + "A revision is duplicated. timestamp: " + timestamp + "; scannerUsed: " + scannerUsed); } int revision = SensorCommon.getMaxRevision(results) + 1;