diff --git a/src/main/java/com/spotify/reaper/storage/CassandraStorage.java b/src/main/java/com/spotify/reaper/storage/CassandraStorage.java index e5e88865c..ea70ea76f 100644 --- a/src/main/java/com/spotify/reaper/storage/CassandraStorage.java +++ b/src/main/java/com/spotify/reaper/storage/CassandraStorage.java @@ -9,6 +9,8 @@ import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -22,6 +24,7 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.collect.ComparisonChain; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -54,6 +57,10 @@ public class CassandraStorage implements IStorage { com.datastax.driver.core.Cluster cassandra = null; Session session; + /** simple cache of repair_id. + * not accurate, only provides a floor value to shortcut looking for next appropriate id */ + private final ConcurrentMap repairIds = new ConcurrentHashMap<>(); + /* Simple statements */ private final String getClustersStmt = "SELECT * FROM cluster"; @@ -84,6 +91,9 @@ public class CassandraStorage implements IStorage { private PreparedStatement deleteRepairScheduleByClusterAndKsPrepStmt; private PreparedStatement deleteRepairSegmentPrepStmt; private PreparedStatement deleteRepairSegmentByRunId; + private PreparedStatement insertRepairId; + private PreparedStatement selectRepairId; + private PreparedStatement updateRepairId; public CassandraStorage(ReaperApplicationConfiguration config, Environment environment) { cassandra = config.getCassandraFactory().build(environment); @@ -126,6 +136,9 @@ private void prepareStatements(){ getRepairScheduleByClusterAndKsPrepStmt = session.prepare("SELECT repair_schedule_id FROM repair_schedule_by_cluster_and_keyspace WHERE cluster_name = ? and keyspace_name = ?"); deleteRepairSchedulePrepStmt = session.prepare("DELETE FROM repair_schedule WHERE id = ?"); deleteRepairScheduleByClusterAndKsPrepStmt = session.prepare("DELETE FROM repair_schedule_by_cluster_and_keyspace WHERE cluster_name = ? and keyspace_name = ? and repair_schedule_id = ?"); + insertRepairId = session.prepare("INSERT INTO repair_id (id_type, id) VALUES(?, 0) IF NOT EXISTS"); + selectRepairId = session.prepare("SELECT id FROM repair_id WHERE id_type = ?"); + updateRepairId = session.prepare("UPDATE repair_id SET id=? WHERE id_type =? IF id = ?"); } @Override @@ -707,26 +720,33 @@ public Collection getClusterScheduleStatuses(String cluste } public long getNewRepairId(String idType){ - long idValue = 0; - - // Create id counter if it doesn't exist yet - session.execute("INSERT INTO repair_id (id_type, id) VALUES('" + idType + "', 0) IF NOT EXISTS"); + if (!repairIds.containsKey(idType)){ + repairIds.putIfAbsent(idType, 0L); + // Create id counter if it doesn't exist yet + session.execute(insertRepairId.bind(idType)); + } + long idValue = repairIds.get(idType); + int attempts = 0; - // Get current value of the counter, increment and perform CAS + // Increment and perform CAS, if it fails then fetch current value of the counter and repeat while(true){ - idValue = session.execute("SELECT id FROM repair_id WHERE id_type = '" + idType + "'").one().getLong("id"); idValue++; - ResultSet casResult = session.execute("UPDATE repair_id SET id=" + idValue + " WHERE id_type = '" + idType + "' IF id = " + (idValue-1)); + ResultSet casResult = session.execute(updateRepairId.bind(idValue, idType, (idValue-1))); if(casResult.wasApplied()){ - break; + break; + }else{ + idValue = session.execute(selectRepairId.bind(idType)).one().getLong("id"); + Preconditions.checkState(idValue < Long.MAX_VALUE); + attempts++; + if(10 <= attempts && 0 == attempts % 10){ + LOG.warn("still cant find a new repairId after " + attempts + " attempts"); + } } } - + repairIds.put(idType, Math.max(idValue, repairIds.get(idType))); return idValue; } - - private RepairRun buildRepairRunFromRow(Row repairRunResult, long id){ return new RepairRun.Builder(repairRunResult.getString("cluster_name"), repairRunResult.getLong("repair_unit_id"), new DateTime(repairRunResult.getTimestamp("creation_time")), repairRunResult.getDouble("intensity"), repairRunResult.getInt("segment_count"), RepairParallelism.fromName(repairRunResult.getString("repair_parallelism"))) .cause(repairRunResult.getString("cause"))