Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat) Impl SegmentList to replace ArrayDequeue in LogManagerImpl, #335 #377

Merged
merged 7 commits into from
Dec 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/BallotBox.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import com.alipay.sofa.jraft.entity.Ballot;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.BallotBoxOptions;
import com.alipay.sofa.jraft.util.ArrayDeque;
import com.alipay.sofa.jraft.util.Describer;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.SegmentList;

/**
* Ballot box for voting.
Expand All @@ -45,22 +45,22 @@
@ThreadSafe
public class BallotBox implements Lifecycle<BallotBoxOptions>, Describer {

private static final Logger LOG = LoggerFactory.getLogger(BallotBox.class);
private static final Logger LOG = LoggerFactory.getLogger(BallotBox.class);

private FSMCaller waiter;
private ClosureQueue closureQueue;
private final StampedLock stampedLock = new StampedLock();
private long lastCommittedIndex = 0;
private long pendingIndex;
private final ArrayDeque<Ballot> pendingMetaQueue = new ArrayDeque<>();
private FSMCaller waiter;
private ClosureQueue closureQueue;
private final StampedLock stampedLock = new StampedLock();
private long lastCommittedIndex = 0;
private long pendingIndex;
private final SegmentList<Ballot> pendingMetaQueue = new SegmentList<>(false);

@OnlyForTest
long getPendingIndex() {
return this.pendingIndex;
}

@OnlyForTest
ArrayDeque<Ballot> getPendingMetaQueue() {
SegmentList<Ballot> getPendingMetaQueue() {
return this.pendingMetaQueue;
}

Expand Down Expand Up @@ -95,7 +95,7 @@ public boolean init(final BallotBoxOptions opts) {
*/
public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) {
// TODO use lock-free algorithm here?
final long stamp = stampedLock.writeLock();
final long stamp = this.stampedLock.writeLock();
long lastCommittedIndex = 0;
try {
if (this.pendingIndex == 0) {
Expand Down Expand Up @@ -127,7 +127,7 @@ public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final
// logs, since we use the new configuration to deal the quorum of the
// removal request, we think it's safe to commit all the uncommitted
// previous logs, which is not well proved right now
this.pendingMetaQueue.removeRange(0, (int) (lastCommittedIndex - this.pendingIndex) + 1);
this.pendingMetaQueue.removeFromFirst((int) (lastCommittedIndex - this.pendingIndex) + 1);
LOG.debug("Committed log fromIndex={}, toIndex={}.", this.pendingIndex, lastCommittedIndex);
this.pendingIndex = lastCommittedIndex + 1;
this.lastCommittedIndex = lastCommittedIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,6 @@ private static void onTimeout(final ThreadId id) {
void destroy() {
final ThreadId savedId = this.id;
LOG.info("Replicator {} is going to quit", savedId);
this.id = null;
releaseReader();
// Unregister replicator metric set
if (this.nodeMetrics.isEnabled()) {
Expand All @@ -1058,6 +1057,7 @@ void destroy() {
this.state = State.Destroyed;
notifyReplicatorStatusListener((Replicator) savedId.getData(), ReplicatorEvent.DESTROYED);
savedId.unlockAndDestroy();
this.id = null;
}

private void releaseReader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.alipay.sofa.jraft.util.LogExceptionHandler;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.SegmentList;
import com.alipay.sofa.jraft.util.ThreadHelper;
import com.alipay.sofa.jraft.util.Utils;
import com.lmax.disruptor.EventFactory;
Expand Down Expand Up @@ -90,8 +91,7 @@ public class LogManagerImpl implements LogManager {
private long nextWaitId;
private LogId diskId = new LogId(0, 0);
private LogId appliedId = new LogId(0, 0);
// TODO use a lock-free concurrent list instead?
private ArrayDeque<LogEntry> logsInMemory = new ArrayDeque<>();
private final SegmentList<LogEntry> logsInMemory = new SegmentList<>(true);
private volatile long firstLogIndex;
private volatile long lastLogIndex;
private volatile LogId lastSnapshotId = new LogId(0, 0);
Expand Down Expand Up @@ -253,16 +253,8 @@ public void shutdown() {
private void clearMemoryLogs(final LogId id) {
this.writeLock.lock();
try {
int index = 0;
for (final int size = this.logsInMemory.size(); index < size; index++) {
final LogEntry entry = this.logsInMemory.get(index);
if (entry.getId().compareTo(id) > 0) {
break;
}
}
if (index > 0) {
this.logsInMemory.removeRange(0, index);
}

this.logsInMemory.removeFromFirstWhen(entry -> entry.getId().compareTo(id) <= 0);
} finally {
this.writeLock.unlock();
}
Expand Down Expand Up @@ -698,7 +690,8 @@ public void clearBufferedLogs() {
private String descLogsInMemory() {
final StringBuilder sb = new StringBuilder();
boolean wasFirst = true;
for (final LogEntry logEntry : this.logsInMemory) {
for (int i = 0; i < this.logsInMemory.size(); i++) {
LogEntry logEntry = this.logsInMemory.get(i);
if (!wasFirst) {
sb.append(",");
} else {
Expand Down Expand Up @@ -933,20 +926,12 @@ public void run(final Status status) {
}

private boolean truncatePrefix(final long firstIndexKept) {
int index = 0;
for (final int size = this.logsInMemory.size(); index < size; index++) {
final LogEntry entry = this.logsInMemory.get(index);
if (entry.getId().getIndex() >= firstIndexKept) {
break;
}
}
if (index > 0) {
this.logsInMemory.removeRange(0, index);
}

this.logsInMemory.removeFromFirstWhen(entry -> entry.getId().getIndex() < firstIndexKept);

// TODO maybe it's fine here
Requires.requireTrue(firstIndexKept >= this.firstLogIndex,
"Try to truncate logs before %d, but the firstLogIndex is %d", firstIndexKept, this.firstLogIndex);
"Try to truncate logs before %d, but the firstLogIndex is %d", firstIndexKept, this.firstLogIndex);

this.firstLogIndex = firstIndexKept;
if (firstIndexKept > this.lastLogIndex) {
Expand All @@ -963,7 +948,7 @@ private boolean truncatePrefix(final long firstIndexKept) {
private boolean reset(final long nextLogIndex) {
this.writeLock.lock();
try {
this.logsInMemory = new ArrayDeque<>();
this.logsInMemory.clear();
this.firstLogIndex = nextLogIndex;
this.lastLogIndex = nextLogIndex - 1;
this.configManager.truncatePrefix(this.firstLogIndex);
Expand All @@ -982,14 +967,9 @@ private void unsafeTruncateSuffix(final long lastIndexKept) {
lastIndexKept);
return;
}
while (!this.logsInMemory.isEmpty()) {
final LogEntry entry = this.logsInMemory.peekLast();
if (entry.getId().getIndex() > lastIndexKept) {
this.logsInMemory.pollLast();
} else {
break;
}
}

this.logsInMemory.removeFromLastWhen(entry -> entry.getId().getIndex() > lastIndexKept);

this.lastLogIndex = lastIndexKept;
final long lastTermKept = unsafeGetTerm(lastIndexKept);
Requires.requireTrue(this.lastLogIndex == 0 || lastTermKept != 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ public abstract class RepeatedTimer implements Describer {
private Timeout timeout;
private boolean stopped;
private volatile boolean running;
private boolean destroyed;
private boolean invoking;
private volatile boolean destroyed;
private volatile boolean invoking;
private volatile int timeoutMs;
private final String name;

public int getTimeoutMs() {
return this.timeoutMs;
}

public RepeatedTimer(String name, int timeoutMs) {
public RepeatedTimer(final String name, final int timeoutMs) {
this(name, timeoutMs, new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048));
}

public RepeatedTimer(String name, int timeoutMs, Timer timer) {
public RepeatedTimer(final String name, final int timeoutMs, final Timer timer) {
super();
this.name = name;
this.timeoutMs = timeoutMs;
Expand All @@ -81,12 +81,7 @@ protected int adjustTimeout(final int timeoutMs) {
}

public void run() {
this.lock.lock();
try {
this.invoking = true;
} finally {
this.lock.unlock();
}
this.invoking = true;
try {
onTrigger();
} catch (final Throwable t) {
Expand Down Expand Up @@ -157,7 +152,7 @@ public void start() {
}

private void schedule() {
if(this.timeout != null) {
if (this.timeout != null) {
this.timeout.cancel();
}
final TimerTask timerTask = timeout -> {
Expand Down Expand Up @@ -216,8 +211,6 @@ public void destroy() {
if (!this.running) {
invokeDestroyed = true;
}
// Timer#stop is idempotent
this.timer.stop();
if (this.stopped) {
return;
}
Expand All @@ -231,6 +224,7 @@ public void destroy() {
}
} finally {
this.lock.unlock();
this.timer.stop();
if (invokeDestroyed) {
onDestroy();
}
Expand Down Expand Up @@ -272,8 +266,8 @@ public void describe(final Printer out) {

@Override
public String toString() {
return "RepeatedTimer{" + "timeout=" + timeout + ", stopped=" + stopped + ", running=" + running
+ ", destroyed=" + destroyed + ", invoking=" + invoking + ", timeoutMs=" + timeoutMs + ", name='" + name
+ '\'' + '}';
return "RepeatedTimer{" + "timeout=" + this.timeout + ", stopped=" + this.stopped + ", running=" + this.running
+ ", destroyed=" + this.destroyed + ", invoking=" + this.invoking + ", timeoutMs=" + this.timeoutMs
+ ", name='" + this.name + '\'' + '}';
}
}
Loading