Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra performance: repair_id prepared statements against repair_id & cache sequence #95

Merged
merged 1 commit into from
May 9, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions src/main/java/com/spotify/reaper/storage/CassandraStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand All @@ -22,6 +24,7 @@
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -54,6 +57,10 @@ public class CassandraStorage implements IStorage {
com.datastax.driver.core.Cluster cassandra = null;
Session session;

/** simple cache of repair_id.
* not accurate, only provides a floor value to shortcut looking for next appropriate id */
private final ConcurrentMap<String,Long> repairIds = new ConcurrentHashMap<>();

/* Simple statements */
private final String getClustersStmt = "SELECT * FROM cluster";

Expand Down Expand Up @@ -84,6 +91,9 @@ public class CassandraStorage implements IStorage {
private PreparedStatement deleteRepairScheduleByClusterAndKsPrepStmt;
private PreparedStatement deleteRepairSegmentPrepStmt;
private PreparedStatement deleteRepairSegmentByRunId;
private PreparedStatement insertRepairId;
private PreparedStatement selectRepairId;
private PreparedStatement updateRepairId;

public CassandraStorage(ReaperApplicationConfiguration config, Environment environment) {
cassandra = config.getCassandraFactory().build(environment);
Expand Down Expand Up @@ -126,6 +136,9 @@ private void prepareStatements(){
getRepairScheduleByClusterAndKsPrepStmt = session.prepare("SELECT repair_schedule_id FROM repair_schedule_by_cluster_and_keyspace WHERE cluster_name = ? and keyspace_name = ?");
deleteRepairSchedulePrepStmt = session.prepare("DELETE FROM repair_schedule WHERE id = ?");
deleteRepairScheduleByClusterAndKsPrepStmt = session.prepare("DELETE FROM repair_schedule_by_cluster_and_keyspace WHERE cluster_name = ? and keyspace_name = ? and repair_schedule_id = ?");
insertRepairId = session.prepare("INSERT INTO repair_id (id_type, id) VALUES(?, 0) IF NOT EXISTS");
selectRepairId = session.prepare("SELECT id FROM repair_id WHERE id_type = ?");
updateRepairId = session.prepare("UPDATE repair_id SET id=? WHERE id_type =? IF id = ?");
}

@Override
Expand Down Expand Up @@ -707,26 +720,33 @@ public Collection<RepairScheduleStatus> getClusterScheduleStatuses(String cluste
}

public long getNewRepairId(String idType){
long idValue = 0;

// Create id counter if it doesn't exist yet
session.execute("INSERT INTO repair_id (id_type, id) VALUES('" + idType + "', 0) IF NOT EXISTS");
if (!repairIds.containsKey(idType)){
repairIds.putIfAbsent(idType, 0L);
// Create id counter if it doesn't exist yet
session.execute(insertRepairId.bind(idType));
}
long idValue = repairIds.get(idType);
int attempts = 0;

// Get current value of the counter, increment and perform CAS
// Increment and perform CAS, if it fails then fetch current value of the counter and repeat
while(true){
idValue = session.execute("SELECT id FROM repair_id WHERE id_type = '" + idType + "'").one().getLong("id");
idValue++;
ResultSet casResult = session.execute("UPDATE repair_id SET id=" + idValue + " WHERE id_type = '" + idType + "' IF id = " + (idValue-1));
ResultSet casResult = session.execute(updateRepairId.bind(idValue, idType, (idValue-1)));
if(casResult.wasApplied()){
break;
break;
}else{
idValue = session.execute(selectRepairId.bind(idType)).one().getLong("id");
Preconditions.checkState(idValue < Long.MAX_VALUE);
attempts++;
if(10 <= attempts && 0 == attempts % 10){
LOG.warn("still cant find a new repairId after " + attempts + " attempts");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAJOR Use the built-in formatting to construct this argument. rule

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of little concern. WARN is generally always enabled, and the frequency of this log line low.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAJOR Use the built-in formatting to construct this argument. rule

}
}
}

repairIds.put(idType, Math.max(idValue, repairIds.get(idType)));
return idValue;
}



private RepairRun buildRepairRunFromRow(Row repairRunResult, long id){
return new RepairRun.Builder(repairRunResult.getString("cluster_name"), repairRunResult.getLong("repair_unit_id"), new DateTime(repairRunResult.getTimestamp("creation_time")), repairRunResult.getDouble("intensity"), repairRunResult.getInt("segment_count"), RepairParallelism.fromName(repairRunResult.getString("repair_parallelism")))
.cause(repairRunResult.getString("cause"))
Expand Down