Skip to content
This repository has been archived by the owner on Mar 31, 2022. It is now read-only.

Add atomic modifyRepairRun method to storage #96

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/main/java/com/spotify/reaper/resources/CommonTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,6 @@ private static void storeNewRepairSegments(AppContext context, List<RingRange> t
repairSegmentBuilders.add(repairSegment);
}
context.storage.addRepairSegments(repairSegmentBuilders, repairRun.getId());
if (repairRun.getSegmentCount() != tokenSegments.size()) {
LOG.debug("created segment amount differs from expected default {} != {}",
repairRun.getSegmentCount(), tokenSegments.size());
context.storage.updateRepairRun(
repairRun.with().segmentCount(tokenSegments.size()).build(repairRun.getId()));
}
}

/**
Expand Down
82 changes: 48 additions & 34 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLongArray;

import javax.annotation.Nullable;

public class RepairRunner implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(RepairRunner.class);
Expand Down Expand Up @@ -163,15 +165,15 @@ public void run() {
LOG.error(Arrays.toString(e.getStackTrace()));
e.printStackTrace();
synchronized (this) {
Optional<RepairRun> repairRun = context.storage.getRepairRun(repairRunId);
if (repairRun.isPresent()) {
context.storage.updateRepairRun(repairRun.get()
.with()
.runState(RepairRun.RunState.ERROR)
.lastEvent(String.format("Exception: %s", e.getMessage()))
.endTime(DateTime.now())
.build(repairRunId));
}
context.storage.modifyRepairRun(repairRunId, new Function<RepairRun.Builder, RepairRun.Builder>() {
@Override
public RepairRun.Builder apply(RepairRun.Builder original) {
return original
.runState(RepairRun.RunState.ERROR)
.lastEvent(String.format("Exception: %s", e.getMessage()))
.endTime(DateTime.now());
}
});
context.repairManager.removeRunner(this);
}
}
Expand All @@ -182,13 +184,14 @@ public void run() {
*/
private void start() throws ReaperException {
LOG.info("Repairs for repair run #{} starting", repairRunId);
synchronized (this) {
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
context.storage.updateRepairRun(repairRun.with()
.runState(RepairRun.RunState.RUNNING)
.startTime(DateTime.now())
.build(repairRun.getId()));
}
context.storage.modifyRepairRun(repairRunId, new Function<RepairRun.Builder, RepairRun.Builder>() {
@Override
public RepairRun.Builder apply(RepairRun.Builder original) {
return original
.runState(RepairRun.RunState.RUNNING)
.startTime(DateTime.now());
}
});
startNextSegment();
}

Expand All @@ -198,12 +201,16 @@ private void start() throws ReaperException {
private void end() {
LOG.info("Repairs for repair run #{} done", repairRunId);
synchronized (this) {
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
context.storage.updateRepairRun(repairRun.with()
.runState(RepairRun.RunState.DONE)
.endTime(DateTime.now())
.lastEvent("All done")
.build(repairRun.getId()));
context.storage.modifyRepairRun(repairRunId,
new Function<RepairRun.Builder, RepairRun.Builder>() {
@Override
public RepairRun.Builder apply(RepairRun.Builder original) {
return original
.runState(RepairRun.RunState.DONE)
.endTime(DateTime.now())
.lastEvent("All done");
}
});
context.repairManager.removeRunner(this);
}
}
Expand Down Expand Up @@ -255,7 +262,8 @@ private void startNextSegment() throws ReaperException {
* @param segmentId id of the segment to repair.
* @param tokenRange token range of the segment to repair.
*/
private void repairSegment(final int rangeIndex, final long segmentId, RingRange tokenRange) throws ReaperException {
private void repairSegment(final int rangeIndex, final long segmentId, final RingRange tokenRange)
throws ReaperException {
final long unitId;
final double intensity;
final RepairParallelism validationParallelism;
Expand Down Expand Up @@ -289,13 +297,15 @@ private void repairSegment(final int rangeIndex, final long segmentId, RingRange
if (potentialCoordinators.isEmpty()) {
// This segment has a faulty token range. Abort the entire repair run.
synchronized (this) {
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
context.storage.updateRepairRun(repairRun
.with()
.runState(RepairRun.RunState.ERROR)
.lastEvent(String.format("No coordinators for range %s", tokenRange.toString()))
.endTime(DateTime.now())
.build(repairRunId));
context.storage.modifyRepairRun(repairRunId, new Function<RepairRun.Builder, RepairRun.Builder>() {
@Override
public RepairRun.Builder apply(RepairRun.Builder original) {
return original
.runState(RepairRun.RunState.ERROR)
.lastEvent(String.format("No coordinators for range %s", tokenRange.toString()))
.endTime(DateTime.now());
}
});
context.repairManager.removeRunner(this);
}
return;
Expand Down Expand Up @@ -344,16 +354,20 @@ private void handleResult(long segmentId) {
}
}

public void updateLastEvent(String newEvent) {
public void updateLastEvent(final String newEvent) {
synchronized (this) {
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
if (repairRun.getRunState().isTerminated()) {
LOG.warn("Will not update lastEvent of run that has already terminated. The message was: "
+ "\"{}\"", newEvent);
} else {
context.storage.updateRepairRun(repairRun.with()
.lastEvent(newEvent)
.build(repairRunId));
context.storage.modifyRepairRun(repairRunId, new Function<RepairRun.Builder, RepairRun.Builder>() {
@Override
public RepairRun.Builder apply(RepairRun.Builder original) {
return original
.lastEvent(newEvent);
}
});
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/spotify/reaper/storage/IStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.spotify.reaper.storage;

import com.google.common.base.Function;
import com.google.common.base.Optional;

import com.spotify.reaper.core.Cluster;
Expand Down Expand Up @@ -55,8 +56,11 @@ public interface IStorage {

RepairRun addRepairRun(RepairRun.Builder repairRun);

@Deprecated
boolean updateRepairRun(RepairRun repairRun);

boolean modifyRepairRun(long id, Function<RepairRun.Builder, RepairRun.Builder> modification);

Optional<RepairRun> getRepairRun(long id);

Collection<RepairRun> getRepairRunsForCluster(String clusterName);
Expand Down
52 changes: 39 additions & 13 deletions src/main/java/com/spotify/reaper/storage/MemoryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.spotify.reaper.storage;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -105,6 +106,7 @@ public RepairRun addRepairRun(RepairRun.Builder repairRun) {
return newRepairRun;
}

@Deprecated
@Override
public boolean updateRepairRun(RepairRun repairRun) {
if (!getRepairRun(repairRun.getId()).isPresent()) {
Expand All @@ -115,6 +117,43 @@ public boolean updateRepairRun(RepairRun repairRun) {
}
}

private final ConcurrentMap<Long, Object> repairRunLocks = Maps.newConcurrentMap();

private Object getRepairRunLock(long id) {
Object newLock = new Object();
Object existingLock = repairRunLocks.putIfAbsent(id, newLock);
return existingLock != null ? existingLock : newLock;
}

@Override
public boolean modifyRepairRun(long id,
Function<RepairRun.Builder, RepairRun.Builder> modification) {
synchronized (getRepairRunLock(id)) {
RepairRun repairRun = repairRuns.get(id);
if (repairRun == null) {
return false;
} else {
repairRuns.put(id, modification.apply(repairRun.with()).build(id));
return true;
}
}
}

@Override
public Optional<RepairRun> deleteRepairRun(long id) {
synchronized (getRepairRunLock(id)) {
RepairRun deletedRun = repairRuns.remove(id);
if (deletedRun != null) {
if (getSegmentAmountForRepairRunWithState(id, RepairSegment.State.RUNNING) == 0) {
deleteRepairUnit(deletedRun.getRepairUnitId());
deleteRepairSegmentsForRun(id);
deletedRun = deletedRun.with().runState(RepairRun.RunState.DELETED).build(id);
}
}
return Optional.fromNullable(deletedRun);
}
}

@Override
public Optional<RepairRun> getRepairRun(long id) {
return Optional.fromNullable(repairRuns.get(id));
Expand Down Expand Up @@ -193,19 +232,6 @@ private int deleteRepairSegmentsForRun(long runId) {
return segmentsMap != null ? segmentsMap.size() : 0;
}

@Override
public Optional<RepairRun> deleteRepairRun(long id) {
RepairRun deletedRun = repairRuns.remove(id);
if (deletedRun != null) {
if (getSegmentAmountForRepairRunWithState(id, RepairSegment.State.RUNNING) == 0) {
deleteRepairUnit(deletedRun.getRepairUnitId());
deleteRepairSegmentsForRun(id);
deletedRun = deletedRun.with().runState(RepairRun.RunState.DELETED).build(id);
}
}
return Optional.fromNullable(deletedRun);
}

@Override
public RepairUnit addRepairUnit(RepairUnit.Builder repairUnit) {
Optional<RepairUnit> existing =
Expand Down
92 changes: 61 additions & 31 deletions src/main/java/com/spotify/reaper/storage/PostgresStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
*/
package com.spotify.reaper.storage;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import com.spotify.reaper.ReaperApplicationConfiguration;
import com.spotify.reaper.ReaperException;
Expand Down Expand Up @@ -45,6 +47,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

import io.dropwizard.jdbi.DBIFactory;
import io.dropwizard.setup.Environment;
Expand Down Expand Up @@ -190,43 +193,69 @@ public Collection<RepairRun> getRepairRunsWithState(RepairRun.RunState runState)
return result == null ? Lists.<RepairRun>newArrayList() : result;
}


// Bj0rn: I don't quite like using this synchronization approach in the postgres storage.
// could transactions solve it in a better way?
private final ConcurrentMap<Long, Object> repairRunLocks = Maps.newConcurrentMap();

private Object getRepairRunLock(long id) {
Object newLock = new Object();
Object existingLock = repairRunLocks.putIfAbsent(id, newLock);
return existingLock != null ? existingLock : newLock;
}

@Override
public boolean modifyRepairRun(long id,
Function<RepairRun.Builder, RepairRun.Builder> modification) {
synchronized (getRepairRunLock(id)) {
Optional<RepairRun> repairRun = getRepairRun(id);
if (!repairRun.isPresent()) {
return false;
} else {
return updateRepairRun(modification.apply(repairRun.get().with()).build(id));
}
}
}

@Override
public Optional<RepairRun> deleteRepairRun(long id) {
RepairRun result = null;
Handle h = null;
try {
h = jdbi.open();
h.begin();
IStoragePostgreSQL pg = getPostgresStorage(h);
RepairRun runToDelete = pg.getRepairRun(id);
if (runToDelete != null) {
int segmentsRunning = pg.getSegmentAmountForRepairRunWithState(id,
RepairSegment.State.RUNNING);
if (segmentsRunning == 0) {
pg.deleteRepairSegmentsForRun(runToDelete.getId());
pg.deleteRepairRun(id);
result = runToDelete.with().runState(RepairRun.RunState.DELETED).build(id);
} else {
LOG.warn("not deleting RepairRun \"{}\" as it has segments running: {}",
id, segmentsRunning);
synchronized (getRepairRunLock(id)) {
RepairRun result = null;
Handle h = null;
try {
h = jdbi.open();
h.begin();
IStoragePostgreSQL pg = getPostgresStorage(h);
RepairRun runToDelete = pg.getRepairRun(id);
if (runToDelete != null) {
int segmentsRunning = pg.getSegmentAmountForRepairRunWithState(id,
RepairSegment.State.RUNNING);
if (segmentsRunning == 0) {
pg.deleteRepairSegmentsForRun(runToDelete.getId());
pg.deleteRepairRun(id);
result = runToDelete.with().runState(RepairRun.RunState.DELETED).build(id);
} else {
LOG.warn("not deleting RepairRun \"{}\" as it has segments running: {}",
id, segmentsRunning);
}
}
h.commit();
} catch (DBIException ex) {
LOG.warn("DELETE failed", ex);
ex.printStackTrace();
if (h != null) {
h.rollback();
}
} finally {
if (h != null) {
h.close();
}
}
h.commit();
} catch (DBIException ex) {
LOG.warn("DELETE failed", ex);
ex.printStackTrace();
if (h != null) {
h.rollback();
}
} finally {
if (h != null) {
h.close();
if (result != null) {
tryDeletingRepairUnit(result.getRepairUnitId());
}
return Optional.fromNullable(result);
}
if (result != null) {
tryDeletingRepairUnit(result.getRepairUnitId());
}
return Optional.fromNullable(result);
}

private void tryDeletingRepairUnit(long id) {
Expand All @@ -251,6 +280,7 @@ public RepairRun addRepairRun(RepairRun.Builder newRepairRun) {
return result;
}

@Deprecated
@Override
public boolean updateRepairRun(RepairRun repairRun) {
boolean result = false;
Expand Down
Loading