Skip to content

Commit

Permalink
Bugfix/readindex concurrent bug (#121)
Browse files Browse the repository at this point in the history
* (bugfix) fix readindex concurrent bug #120

* (bugfix) fix readindex concurrent bug #120

* (fix) fix travis build failed with openjdk11 (#122)
  • Loading branch information
fengjiachun authored and killme2008 committed Apr 21, 2019
1 parent 7c4b0ae commit 83e3389
Showing 1 changed file with 43 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@

/**
* Read-only service implementation.
* @author dennis
*
* @author dennis
*/
public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndexListener {

/** disruptor to run readonly service. */
/** Disruptor to run readonly service. */
private Disruptor<ReadIndexEvent> readIndexDisruptor;
private RingBuffer<ReadIndexEvent> readIndexQueue;
private RaftOptions raftOptions;
Expand Down Expand Up @@ -90,20 +90,21 @@ public ReadIndexEvent newInstance() {

private class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> {
// task list for batch
private List<ReadIndexEvent> events = new ArrayList<>(raftOptions.getApplyBatch());
private List<ReadIndexEvent> events = new ArrayList<>(ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());

@Override
public void onEvent(ReadIndexEvent newEvent, long sequence, boolean endOfBatch) throws Exception {
if (newEvent.shutdownLatch != null) {
executeReadIndexEvents(events);
executeReadIndexEvents(this.events);
this.events.clear();
newEvent.shutdownLatch.countDown();
return;
}

events.add(newEvent);
if (events.size() >= raftOptions.getApplyBatch() || endOfBatch) {
executeReadIndexEvents(events);
events = new ArrayList<>(raftOptions.getApplyBatch());
this.events.add(newEvent);
if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
executeReadIndexEvents(this.events);
this.events.clear();
}
}
}
Expand All @@ -115,14 +116,11 @@ public void onEvent(ReadIndexEvent newEvent, long sequence, boolean endOfBatch)
*/
class ReadIndexResponseClosure extends RpcResponseClosureAdapter<ReadIndexResponse> {

final List<ReadIndexEvent> events;
final List<ReadIndexState> states;
final ReadIndexRequest request;

public ReadIndexResponseClosure(List<ReadIndexEvent> events, List<ReadIndexState> states,
final ReadIndexRequest request) {
public ReadIndexResponseClosure(List<ReadIndexState> states, ReadIndexRequest request) {
super();
this.events = events;
this.states = states;
this.request = request;
}
Expand All @@ -131,7 +129,7 @@ public ReadIndexResponseClosure(List<ReadIndexEvent> events, List<ReadIndexState
* Called when ReadIndex response returns.
*/
@Override
public void run(Status status) {
public void run(final Status status) {
if (!status.isOk()) {
notifyFail(status);
return;
Expand All @@ -141,52 +139,54 @@ public void run(Status status) {
notifyFail(new Status(-1, "Fail to run ReadIndex task, maybe the leader stepped down."));
return;
}
//Success
final ReadIndexStatus readIndexStatus = new ReadIndexStatus(states, request, readIndexResponse.getIndex());
for (final ReadIndexState state : states) {
//Records current commit log index.
// Success
final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request, readIndexResponse.getIndex());
for (final ReadIndexState state : this.states) {
// Records current commit log index.
state.setIndex(readIndexResponse.getIndex());
}

boolean doUnlock = true;
lock.lock();
ReadOnlyServiceImpl.this.lock.lock();
try {
if (readIndexStatus.isApplied(fsmCaller.getLastAppliedIndex())) {
if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
// Already applied,notify readIndex request.
lock.unlock();
ReadOnlyServiceImpl.this.lock.unlock();
doUnlock = false;
notifySuccess(readIndexStatus);
} else {
// Not applied, add it to pending-notify cache.
pendingNotifyStatus.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10))
ReadOnlyServiceImpl.this.pendingNotifyStatus.computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10))
.add(readIndexStatus);
}
} finally {
if (doUnlock) {
lock.unlock();
ReadOnlyServiceImpl.this.lock.unlock();
}
}
}

private void notifyFail(Status status) {
private void notifyFail(final Status status) {
final long nowMs = Utils.monotonicMs();
for (final ReadIndexEvent event : events) {
node.getNodeMetrics().recordLatency("read-index", nowMs - event.startTime);
if (event.done != null) {
event.done.run(status, ReadIndexClosure.INVALID_LOG_INDEX, event.requestContext.get());
for (final ReadIndexState state : this.states) {
ReadOnlyServiceImpl.this.node.getNodeMetrics().recordLatency("read-index",
nowMs - state.getStartTimeMs());
final ReadIndexClosure done = state.getDone();
if (done != null) {
final Bytes reqCtx = state.getRequestContext();
done.run(status, ReadIndexClosure.INVALID_LOG_INDEX, reqCtx != null ? reqCtx.get() : null);
}
}
}

}

private void executeReadIndexEvents(List<ReadIndexEvent> events) {
private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
if (events.isEmpty()) {
return;
}
final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder();
rb.setGroupId(node.getGroupId());
rb.setServerId(node.getServerId().toString());
final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
.setGroupId(this.node.getGroupId()) //
.setServerId(this.node.getServerId().toString());

final List<ReadIndexState> states = new ArrayList<>(events.size());

Expand All @@ -196,28 +196,28 @@ private void executeReadIndexEvents(List<ReadIndexEvent> events) {
}
final ReadIndexRequest request = rb.build();

node.handleReadIndexRequest(request, new ReadIndexResponseClosure(events, states, request));
this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}

@Override
public boolean init(ReadOnlyServiceOptions opts) {
public boolean init(final ReadOnlyServiceOptions opts) {
this.node = opts.getNode();
this.fsmCaller = opts.getFsmCaller();
this.raftOptions = opts.getRaftOptions();

this.scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true));
this.readIndexDisruptor = new Disruptor<>(new ReadIndexEventFactory(), raftOptions.getDisruptorBufferSize(),
this.readIndexDisruptor = new Disruptor<>(new ReadIndexEventFactory(), this.raftOptions.getDisruptorBufferSize(),
new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true));
this.readIndexDisruptor.handleEventsWith(new ReadIndexEventHandler());
this.readIndexDisruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(this.getClass().getSimpleName()));
this.readIndexDisruptor.start();
this.readIndexQueue = this.readIndexDisruptor.getRingBuffer();

//listen on lastAppliedLogIndex change events.
// listen on lastAppliedLogIndex change events.
this.fsmCaller.addLastAppliedLogIndexListener(this);

//start scanner
// start scanner
this.scheduledExecutorService.scheduleAtFixedRate(() -> onApplied(this.fsmCaller.getLastAppliedIndex()),
this.raftOptions.getMaxElectionDelayMs(), this.raftOptions.getMaxElectionDelayMs(), TimeUnit.MILLISECONDS);
return true;
Expand All @@ -243,9 +243,9 @@ public void join() throws InterruptedException {
}

@Override
public void addRequest(byte[] reqCtx, ReadIndexClosure closure) {
public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
if (this.shutdownLatch != null) {
Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "was stopped"));
Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Was stopped"));
throw new IllegalStateException("Service already shutdown.");
}
try {
Expand All @@ -260,15 +260,15 @@ public void addRequest(byte[] reqCtx, ReadIndexClosure closure) {
}

/**
* Called when lastAppliedIndex updates
* Called when lastAppliedIndex updates.
*
* @param appliedIndex applied index
*/
@Override
public void onApplied(long appliedIndex) {
public void onApplied(final long appliedIndex) {
// TODO reuse pendingStatuses list?
List<ReadIndexStatus> pendingStatuses = null;
lock.lock();
this.lock.lock();
try {
if (this.pendingNotifyStatus.isEmpty()) {
return;
Expand All @@ -288,7 +288,7 @@ public void onApplied(long appliedIndex) {

}
} finally {
lock.unlock();
this.lock.unlock();
if (pendingStatuses != null && !pendingStatuses.isEmpty()) {
for (final ReadIndexStatus status : pendingStatuses) {
notifySuccess(status);
Expand Down Expand Up @@ -326,5 +326,4 @@ private void notifySuccess(final ReadIndexStatus status) {
}
}
}

}

0 comments on commit 83e3389

Please sign in to comment.