Skip to content

Commit

Permalink
In CassandraStorage replace the table scan on repair_run with a asy…
Browse files Browse the repository at this point in the history
…nc break-down of per cluster run-throughs of known run IDs.

 ref: #105
  • Loading branch information
michaelsembwever committed May 26, 2017
1 parent b49248e commit 791c6a8
Showing 1 changed file with 32 additions and 29 deletions.
61 changes: 32 additions & 29 deletions src/main/java/com/spotify/reaper/storage/CassandraStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.base.Optional;
import com.google.common.collect.ComparisonChain;
Expand Down Expand Up @@ -51,6 +50,8 @@
import org.joda.time.DateTime;

import io.dropwizard.setup.Environment;
import java.util.function.Function;
import java.util.stream.Stream;
import systems.composable.dropwizard.cassandra.CassandraFactory;

public final class CassandraStorage implements IStorage {
Expand All @@ -62,7 +63,6 @@ public final class CassandraStorage implements IStorage {
private static final String SELECT_CLUSTER = "SELECT * FROM cluster";
private static final String SELECT_REPAIR_SCHEDULE = "SELECT * FROM repair_schedule";
private static final String SELECT_REPAIR_UNIT = "SELECT * FROM repair_unit";
private static final String SELECT_REPAIR_RUN = "SELECT * FROM repair_run";

/* prepared statements */
private PreparedStatement insertClusterPrepStmt;
Expand Down Expand Up @@ -90,7 +90,7 @@ public final class CassandraStorage implements IStorage {
private PreparedStatement deleteRepairScheduleByClusterAndKsPrepStmt;

public CassandraStorage(ReaperApplicationConfiguration config, Environment environment) {

CassandraFactory cassandraFactory = config.getCassandraFactory();
// all INSERT and DELETE statement prepared in this class are idempotent
cassandraFactory.setQueryOptions(java.util.Optional.of(new QueryOptions().setDefaultIdempotence(true)));
Expand Down Expand Up @@ -232,19 +232,19 @@ public RepairRun addRepairRun(Builder repairRun, Collection<RepairSegment.Builde
@Override
public boolean updateRepairRun(RepairRun repairRun) {
session.executeAsync(insertRepairRunPrepStmt.bind(
repairRun.getId(),
repairRun.getClusterName(),
repairRun.getRepairUnitId(),
repairRun.getCause(),
repairRun.getOwner(),
repairRun.getRunState().toString(),
repairRun.getCreationTime(),
repairRun.getStartTime(),
repairRun.getEndTime(),
repairRun.getPauseTime(),
repairRun.getIntensity(),
repairRun.getLastEvent(),
repairRun.getSegmentCount(),
repairRun.getId(),
repairRun.getClusterName(),
repairRun.getRepairUnitId(),
repairRun.getCause(),
repairRun.getOwner(),
repairRun.getRunState().toString(),
repairRun.getCreationTime(),
repairRun.getStartTime(),
repairRun.getEndTime(),
repairRun.getPauseTime(),
repairRun.getIntensity(),
repairRun.getLastEvent(),
repairRun.getSegmentCount(),
repairRun.getRepairParallelism().toString()));
return true;
}
Expand Down Expand Up @@ -316,17 +316,20 @@ private Collection<RepairRun> getRepairRunsAsync(List<ResultSetFuture> repairRun

@Override
public Collection<RepairRun> getRepairRunsWithState(RunState runState) {
// There shouldn't be many repair runs, so we'll brute force this one
// We'll switch to 2i if performance sucks IRL
Collection<RepairRun> repairRuns = Lists.<RepairRun>newArrayList();
ResultSet repairRunResults = session.execute(SELECT_REPAIR_RUN);
for(Row repairRun:repairRunResults){
if(RunState.valueOf(repairRun.getString("state")).equals(runState)){
repairRuns.add(buildRepairRunFromRow(repairRun, repairRun.getUUID("id")));
}
}

return repairRuns;
return getClusters().stream()
// Grab all ids for the given cluster name
.map((cluster) -> getRepairRunIdsForCluster(cluster.getName()))
// Grab repair runs asynchronously for all the ids returned by the index table
.flatMap((Collection<UUID> repairRunIds)
-> repairRunIds.stream()
.map((repairRunId) -> session.executeAsync(getRepairRunPrepStmt.bind(repairRunId))))
// wait for results
.map((ResultSetFuture future) -> {
Row repairRunResult = future.getUninterruptibly().one();
return buildRepairRunFromRow(repairRunResult, repairRunResult.getUUID("id"));})
// filter on runState
.filter((RepairRun t) -> t.getRunState() == runState)
.collect(Collectors.toSet());
}

@Override
Expand Down Expand Up @@ -613,7 +616,7 @@ public Collection<RepairSchedule> getAllRepairSchedules() {
for(Row scheduleRow:scheduleResults){
schedules.add(createRepairScheduleFromRow(scheduleRow));
}

return schedules;
}

Expand Down Expand Up @@ -663,7 +666,7 @@ public Optional<RepairSchedule> deleteRepairSchedule(UUID id) {

session.executeAsync(deleteRepairScheduleByClusterAndKsPrepStmt
.bind(" ", repairUnit.getKeyspaceName(), repairSchedule.get().getId()));

session.executeAsync(deleteRepairSchedulePrepStmt.bind(repairSchedule.get().getId()));
}

Expand Down

0 comments on commit 791c6a8

Please sign in to comment.