diff --git a/src/main/java/com/spotify/reaper/storage/CassandraStorage.java b/src/main/java/com/spotify/reaper/storage/CassandraStorage.java index 275adfa15..1c7f4f4af 100644 --- a/src/main/java/com/spotify/reaper/storage/CassandraStorage.java +++ b/src/main/java/com/spotify/reaper/storage/CassandraStorage.java @@ -22,7 +22,6 @@ import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; -import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.utils.UUIDs; import com.google.common.base.Optional; import com.google.common.collect.ComparisonChain; @@ -51,6 +50,8 @@ import org.joda.time.DateTime; import io.dropwizard.setup.Environment; +import java.util.function.Function; +import java.util.stream.Stream; import systems.composable.dropwizard.cassandra.CassandraFactory; public final class CassandraStorage implements IStorage { @@ -62,7 +63,6 @@ public final class CassandraStorage implements IStorage { private static final String SELECT_CLUSTER = "SELECT * FROM cluster"; private static final String SELECT_REPAIR_SCHEDULE = "SELECT * FROM repair_schedule"; private static final String SELECT_REPAIR_UNIT = "SELECT * FROM repair_unit"; - private static final String SELECT_REPAIR_RUN = "SELECT * FROM repair_run"; /* prepared statements */ private PreparedStatement insertClusterPrepStmt; @@ -90,7 +90,7 @@ public final class CassandraStorage implements IStorage { private PreparedStatement deleteRepairScheduleByClusterAndKsPrepStmt; public CassandraStorage(ReaperApplicationConfiguration config, Environment environment) { - + CassandraFactory cassandraFactory = config.getCassandraFactory(); // all INSERT and DELETE statement prepared in this class are idempotent cassandraFactory.setQueryOptions(java.util.Optional.of(new QueryOptions().setDefaultIdempotence(true))); @@ -232,19 +232,19 @@ public RepairRun addRepairRun(Builder repairRun, Collection getRepairRunsAsync(List repairRun @Override public Collection getRepairRunsWithState(RunState runState) { - // There shouldn't be many repair runs, so we'll brute force this one - // We'll switch to 2i if performance sucks IRL - Collection repairRuns = Lists.newArrayList(); - ResultSet repairRunResults = session.execute(SELECT_REPAIR_RUN); - for(Row repairRun:repairRunResults){ - if(RunState.valueOf(repairRun.getString("state")).equals(runState)){ - repairRuns.add(buildRepairRunFromRow(repairRun, repairRun.getUUID("id"))); - } - } - - return repairRuns; + return getClusters().stream() + // Grab all ids for the given cluster name + .map((cluster) -> getRepairRunIdsForCluster(cluster.getName())) + // Grab repair runs asynchronously for all the ids returned by the index table + .flatMap((Collection repairRunIds) + -> repairRunIds.stream() + .map((repairRunId) -> session.executeAsync(getRepairRunPrepStmt.bind(repairRunId)))) + // wait for results + .map((ResultSetFuture future) -> { + Row repairRunResult = future.getUninterruptibly().one(); + return buildRepairRunFromRow(repairRunResult, repairRunResult.getUUID("id"));}) + // filter on runState + .filter((RepairRun t) -> t.getRunState() == runState) + .collect(Collectors.toSet()); } @Override @@ -613,7 +616,7 @@ public Collection getAllRepairSchedules() { for(Row scheduleRow:scheduleResults){ schedules.add(createRepairScheduleFromRow(scheduleRow)); } - + return schedules; } @@ -663,7 +666,7 @@ public Optional deleteRepairSchedule(UUID id) { session.executeAsync(deleteRepairScheduleByClusterAndKsPrepStmt .bind(" ", repairUnit.getKeyspaceName(), repairSchedule.get().getId())); - + session.executeAsync(deleteRepairSchedulePrepStmt.bind(repairSchedule.get().getId())); }