Skip to content

Commit

Permalink
Cassandra performance: use prepared statements against repair_id, and…
Browse files Browse the repository at this point in the history
… cache a floor for the sequence number to reduce lookups.

ref:
 - #94
 - #95
  • Loading branch information
michaelsembwever committed May 9, 2017
1 parent 7032dde commit 504e937
Showing 1 changed file with 31 additions and 11 deletions.
42 changes: 31 additions & 11 deletions src/main/java/com/spotify/reaper/storage/CassandraStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand All @@ -22,6 +24,7 @@
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -54,6 +57,10 @@ public class CassandraStorage implements IStorage {
com.datastax.driver.core.Cluster cassandra = null;
Session session;

/** simple cache of repair_id.
* not accurate, only provides a floor value to shortcut looking for next appropriate id */
private final ConcurrentMap<String,Long> repairIds = new ConcurrentHashMap<>();

/* Simple statements */
private final String getClustersStmt = "SELECT * FROM cluster";

Expand Down Expand Up @@ -84,6 +91,9 @@ public class CassandraStorage implements IStorage {
private PreparedStatement deleteRepairScheduleByClusterAndKsPrepStmt;
private PreparedStatement deleteRepairSegmentPrepStmt;
private PreparedStatement deleteRepairSegmentByRunId;
private PreparedStatement insertRepairId;
private PreparedStatement selectRepairId;
private PreparedStatement updateRepairId;

public CassandraStorage(ReaperApplicationConfiguration config, Environment environment) {
cassandra = config.getCassandraFactory().build(environment);
Expand Down Expand Up @@ -126,6 +136,9 @@ private void prepareStatements(){
getRepairScheduleByClusterAndKsPrepStmt = session.prepare("SELECT repair_schedule_id FROM repair_schedule_by_cluster_and_keyspace WHERE cluster_name = ? and keyspace_name = ?");
deleteRepairSchedulePrepStmt = session.prepare("DELETE FROM repair_schedule WHERE id = ?");
deleteRepairScheduleByClusterAndKsPrepStmt = session.prepare("DELETE FROM repair_schedule_by_cluster_and_keyspace WHERE cluster_name = ? and keyspace_name = ? and repair_schedule_id = ?");
insertRepairId = session.prepare("INSERT INTO repair_id (id_type, id) VALUES(?, 0) IF NOT EXISTS");
selectRepairId = session.prepare("SELECT id FROM repair_id WHERE id_type = ?");
updateRepairId = session.prepare("UPDATE repair_id SET id=? WHERE id_type =? IF id = ?");
}

@Override
Expand Down Expand Up @@ -707,26 +720,33 @@ public Collection<RepairScheduleStatus> getClusterScheduleStatuses(String cluste
}

public long getNewRepairId(String idType){
long idValue = 0;

// Create id counter if it doesn't exist yet
session.execute("INSERT INTO repair_id (id_type, id) VALUES('" + idType + "', 0) IF NOT EXISTS");
if (!repairIds.containsKey(idType)){
repairIds.putIfAbsent(idType, 0L);
// Create id counter if it doesn't exist yet
session.execute(insertRepairId.bind(idType));
}
long idValue = repairIds.get(idType);
int attempts = 0;

// Get current value of the counter, increment and perform CAS
// Increment and perform CAS, if it fails then fetch current value of the counter and repeat
while(true){
idValue = session.execute("SELECT id FROM repair_id WHERE id_type = '" + idType + "'").one().getLong("id");
idValue++;
ResultSet casResult = session.execute("UPDATE repair_id SET id=" + idValue + " WHERE id_type = '" + idType + "' IF id = " + (idValue-1));
ResultSet casResult = session.execute(updateRepairId.bind(idValue, idType, (idValue-1)));
if(casResult.wasApplied()){
break;
break;
}else{
idValue = session.execute(selectRepairId.bind(idType)).one().getLong("id");
Preconditions.checkState(idValue < Long.MAX_VALUE);
attempts++;
if(10 <= attempts && 0 == attempts % 10){
LOG.warn("still cant find a new repairId after " + attempts + " attempts");
}
}
}

repairIds.put(idType, Math.max(idValue, repairIds.get(idType)));
return idValue;
}



private RepairRun buildRepairRunFromRow(Row repairRunResult, long id){
return new RepairRun.Builder(repairRunResult.getString("cluster_name"), repairRunResult.getLong("repair_unit_id"), new DateTime(repairRunResult.getTimestamp("creation_time")), repairRunResult.getDouble("intensity"), repairRunResult.getInt("segment_count"), RepairParallelism.fromName(repairRunResult.getString("repair_parallelism")))
.cause(repairRunResult.getString("cause"))
Expand Down

0 comments on commit 504e937

Please sign in to comment.