Skip to content

Commit

Permalink
(bugfix) fix readindex concurrent bug #120
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Apr 19, 2019
1 parent 7239cb5 commit 52d0182
Showing 1 changed file with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
if (events.isEmpty()) {
return;
}
final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder();
rb.setGroupId(this.node.getGroupId());
rb.setServerId(this.node.getServerId().toString());
final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
.setGroupId(this.node.getGroupId()) //
.setServerId(this.node.getServerId().toString());

final List<ReadIndexState> states = new ArrayList<>(events.size());

Expand All @@ -207,17 +207,17 @@ public boolean init(final ReadOnlyServiceOptions opts) {

this.scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory("ReadOnlyService-PendingNotify-Scanner", true));
this.readIndexDisruptor = new Disruptor<>(new ReadIndexEventFactory(), raftOptions.getDisruptorBufferSize(),
this.readIndexDisruptor = new Disruptor<>(new ReadIndexEventFactory(), this.raftOptions.getDisruptorBufferSize(),
new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-", true));
this.readIndexDisruptor.handleEventsWith(new ReadIndexEventHandler());
this.readIndexDisruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(this.getClass().getSimpleName()));
this.readIndexDisruptor.start();
this.readIndexQueue = this.readIndexDisruptor.getRingBuffer();

//listen on lastAppliedLogIndex change events.
// listen on lastAppliedLogIndex change events.
this.fsmCaller.addLastAppliedLogIndexListener(this);

//start scanner
// start scanner
this.scheduledExecutorService.scheduleAtFixedRate(() -> onApplied(this.fsmCaller.getLastAppliedIndex()),
this.raftOptions.getMaxElectionDelayMs(), this.raftOptions.getMaxElectionDelayMs(), TimeUnit.MILLISECONDS);
return true;
Expand All @@ -229,7 +229,7 @@ public synchronized void shutdown() {
return;
}
this.shutdownLatch = new CountDownLatch(1);
this.readIndexQueue.publishEvent((event, sequence) -> event.shutdownLatch = ReadOnlyServiceImpl.this.shutdownLatch);
this.readIndexQueue.publishEvent((event, sequence) -> event.shutdownLatch = this.shutdownLatch);
this.scheduledExecutorService.shutdown();
}

Expand All @@ -245,7 +245,7 @@ public void join() throws InterruptedException {
@Override
public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
if (this.shutdownLatch != null) {
Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "was stopped"));
Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Was stopped"));
throw new IllegalStateException("Service already shutdown.");
}
try {
Expand All @@ -260,7 +260,7 @@ public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
}

/**
* Called when lastAppliedIndex updates
* Called when lastAppliedIndex updates.
*
* @param appliedIndex applied index
*/
Expand Down

0 comments on commit 52d0182

Please sign in to comment.