Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions scalardb-test/schema/tx_sensor.cql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ DROP KEYSPACE IF EXISTS sensor;
CREATE KEYSPACE IF NOT EXISTS sensor WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
DROP KEYSPACE IF EXISTS coordinator;
CREATE KEYSPACE IF NOT EXISTS coordinator WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE KEYSPACE IF NOT EXISTS scalardb WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };

CREATE TABLE IF NOT EXISTS sensor.tx_sensor (
timestamp int,
Expand All @@ -27,3 +28,12 @@ CREATE TABLE IF NOT EXISTS coordinator.state (
tx_created_at bigint,
PRIMARY KEY (tx_id)
);

CREATE TABLE IF NOT EXISTS scalardb.namespaces (
name text,
PRIMARY KEY (name)
);

INSERT INTO scalardb.namespaces (name) VALUES ('sensor');
INSERT INTO scalardb.namespaces (name) VALUES ('coordinator');
INSERT INTO scalardb.namespaces (name) VALUES ('scalardb');
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void execute() {
}

if (isDuplicated) {
logError("dupilication happened !");
logError("Duplication happened !");
throw new PostProcessException("Inconsistency happened!");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static int getMaxRevision(List<Result> results) {
return maxRevision.orElse(0);
}

private static int getRevisionFromResult(Result result) {
public static int getRevisionFromResult(Result result) {
return result.getInt(REVISION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,20 @@
import com.scalar.db.api.Put;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.TransactionCrudOperable;
import com.scalar.db.exception.transaction.TransactionException;
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import com.scalar.kelpie.config.Config;
import com.scalar.kelpie.exception.ProcessFatalException;
import com.scalar.kelpie.modules.TimeBasedProcessor;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.json.Json;
import kelpie.scalardb.Common;

Expand All @@ -21,6 +27,7 @@ public class SensorProcessor extends TimeBasedProcessor {
private final int numDevices;
private final AtomicBoolean isVerification;
private final int startTimestamp;
private final AtomicInteger numAttempts = new AtomicInteger();

public SensorProcessor(Config config) {
super(config);
Expand Down Expand Up @@ -67,13 +74,43 @@ public void close() {

private void updateRevision(DistributedTransaction transaction, int timestamp, int deviceId)
throws TransactionException {

Scan scan = SensorCommon.prepareScan(timestamp);
List<Result> results = transaction.scan(scan);

boolean hasDuplicatedRevision = SensorCommon.hasDuplicatedRevision(results);
boolean hasDuplicatedRevision;
List<Result> results;

// Alternate between scan() and getScanner() based on the attempt count.
boolean scannerUsed = numAttempts.getAndIncrement() % 2 == 0;
if (!scannerUsed) {
// Use scan()
results = transaction.scan(scan);
hasDuplicatedRevision = SensorCommon.hasDuplicatedRevision(results);
} else {
// Use getScanner()
hasDuplicatedRevision = false;
results = new ArrayList<>();
try (TransactionCrudOperable.Scanner scanner = transaction.getScanner(scan)) {
Set<Integer> tempSet = new HashSet<>();
while (true) {
Optional<Result> result = scanner.one();
if (!result.isPresent()) {
break;
}

int revision = SensorCommon.getRevisionFromResult(result.get());
if (!tempSet.add(revision)) {
hasDuplicatedRevision = true;
break;
}

results.add(result.get());
}
}
}

if (hasDuplicatedRevision) {
throw new ProcessFatalException("A revision is duplicated at " + timestamp);
throw new ProcessFatalException(
"A revision is duplicated. timestamp: " + timestamp + "; scannerUsed: " + scannerUsed);
}

int revision = SensorCommon.getMaxRevision(results) + 1;
Expand Down