Skip to content

Commit

Permalink
In CassandraStorage implement segments as clustering keys within the …
Browse files Browse the repository at this point in the history
…repair_run table.

Change required in IStorage so to identify a segment both by runId and segmentId.

ref:
 - #94
 - #102
  • Loading branch information
michaelsembwever committed May 23, 2017
1 parent 2a778bd commit d1267a5
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 236 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<dropwizard.version>1.1.0</dropwizard.version>
<dropwizard.version>1.0.7</dropwizard.version>
<dropwizard.cassandra.version>4.1.0</dropwizard.cassandra.version>
<cassandra.version>2.2.7</cassandra.version>
<cucumber.version>1.2.5</cucumber.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void setCassandraFactory(CassandraFactory cassandra) {
}

public Boolean getAllowUnreachableNodes() {
return allowUnreachableNodes;
return allowUnreachableNodes != null ? allowUnreachableNodes : false;
}

public void setAllowUnreachableNodes(Boolean allow) {
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ private void startNextSegment() throws ReaperException {

// Just checking that no currently running segment runner is stuck.
RepairSegment supposedlyRunningSegment =
context.storage.getRepairSegment(currentlyRunningSegments.get(rangeIndex)).get();
context.storage.getRepairSegment(repairRunId, currentlyRunningSegments.get(rangeIndex)).get();
DateTime startTime = supposedlyRunningSegment.getStartTime();
if (startTime != null && startTime.isBefore(DateTime.now().minusDays(1))) {
LOG.warn("Looks like segment #{} has been running more than a day. Start time: {}",
Expand Down Expand Up @@ -343,7 +343,8 @@ private boolean repairSegment(final int rangeIndex, final UUID segmentId, RingRa
}
}
else {
potentialCoordinators = Arrays.asList(context.storage.getRepairSegment(segmentId).get().getCoordinatorHost());
potentialCoordinators
= Arrays.asList(context.storage.getRepairSegment(repairRunId, segmentId).get().getCoordinatorHost());
}

SegmentRunner segmentRunner = new SegmentRunner(context, segmentId, potentialCoordinators,
Expand All @@ -369,7 +370,7 @@ public void onFailure(Throwable t) {
}

private void handleResult(UUID segmentId) {
RepairSegment segment = context.storage.getRepairSegment(segmentId).get();
RepairSegment segment = context.storage.getRepairSegment(repairRunId, segmentId).get();
RepairSegment.State segmentState = segment.getState();
LOG.debug("In repair run #{}, triggerRepair on segment {} ended with state {}",
repairRunId, segmentId, segmentState);
Expand Down
16 changes: 9 additions & 7 deletions src/main/java/com/spotify/reaper/service/SegmentRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public final class SegmentRunner implements RepairStatusHandler, Runnable {
public SegmentRunner(AppContext context, UUID segmentId, Collection<String> potentialCoordinators,
long timeoutMillis, double intensity, RepairParallelism validationParallelism,
String clusterName, RepairUnit repairUnit, RepairRunner repairRunner) {

assert !segmentRunners.containsKey(segmentId) : "SegmentRunner already exists for segment with ID: " + segmentId;
this.context = context;
this.segmentId = segmentId;
this.potentialCoordinators = potentialCoordinators;
Expand All @@ -93,7 +95,7 @@ public SegmentRunner(AppContext context, UUID segmentId, Collection<String> pote

@Override
public void run() {
final RepairSegment segment = context.storage.getRepairSegment(segmentId).get();
final RepairSegment segment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get();
Thread.currentThread().setName(clusterName + ":" + segment.getRunId() + ":" + segmentId);

runRepair();
Expand Down Expand Up @@ -129,7 +131,7 @@ public static void abort(AppContext context, RepairSegment segment, JmxProxy jmx
*/
public void postponeCurrentSegment() {
synchronized (condition) {
RepairSegment segment = context.storage.getRepairSegment(segmentId).get();
RepairSegment segment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get();
postpone(context, segment, context.storage.getRepairUnit(segment.getRepairUnitId()));
}
}
Expand All @@ -149,7 +151,7 @@ private long getOpenFilesAmount() {

private void runRepair() {
LOG.debug("Run repair for segment #{}", segmentId);
final RepairSegment segment = context.storage.getRepairSegment(segmentId).get();
final RepairSegment segment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get();
try (JmxProxy coordinator = context.jmxConnectionFactory
.connectAny(Optional.<RepairStatusHandler>fromNullable(this), potentialCoordinators)) {

Expand Down Expand Up @@ -220,7 +222,7 @@ protected Set<String> initialize() {
} catch (InterruptedException e) {
LOG.warn("Repair command {} on segment {} interrupted", commandId, segmentId, e);
} finally {
RepairSegment resultingSegment = context.storage.getRepairSegment(segmentId).get();
RepairSegment resultingSegment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get();
LOG.info("Repair command {} on segment {} returned with state {}", commandId, segmentId,
resultingSegment.getState());
if (resultingSegment.getState() == RepairSegment.State.RUNNING) {
Expand Down Expand Up @@ -382,7 +384,7 @@ private void abort(RepairSegment segment, JmxProxy jmxConnection) {
*/
@Override
public void handle(int repairNumber, Optional<ActiveRepairService.Status> status, Optional<ProgressEventType> progress, String message) {
final RepairSegment segment = context.storage.getRepairSegment(segmentId).get();
final RepairSegment segment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get();
Thread.currentThread().setName(clusterName + ":" + segment.getRunId() + ":" + segmentId);
LOG.debug(
"handle called for repairCommandId {}, outcome {} / {} and message: {}",
Expand All @@ -396,7 +398,7 @@ public void handle(int repairNumber, Optional<ActiveRepairService.Status> status
boolean failOutsideSynchronizedBlock = false;
// DO NOT ADD EXTERNAL CALLS INSIDE THIS SYNCHRONIZED BLOCK (JMX PROXY ETC)
synchronized (condition) {
RepairSegment currentSegment = context.storage.getRepairSegment(segmentId).get();
RepairSegment currentSegment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get();
// See status explanations at: https://wiki.apache.org/cassandra/RepairAsyncAPI
// Old repair API
if(status.isPresent()) {
Expand Down Expand Up @@ -524,7 +526,7 @@ public static String parseRepairId(String message) {
* @return the delay in milliseconds.
*/
long intensityBasedDelayMillis(double intensity) {
RepairSegment repairSegment = context.storage.getRepairSegment(segmentId).get();
RepairSegment repairSegment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get();
if (repairSegment.getEndTime() == null && repairSegment.getStartTime() == null) {
return 0;
} else if (repairSegment.getEndTime() != null && repairSegment.getStartTime() != null) {
Expand Down
Loading

0 comments on commit d1267a5

Please sign in to comment.