Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat) Impl SegmentList to replace ArrayDequeue in LogManagerImpl, #335 #377

Merged
merged 7 commits into from
Dec 30, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,6 @@ private static void onTimeout(final ThreadId id) {
void destroy() {
final ThreadId savedId = this.id;
LOG.info("Replicator {} is going to quit", savedId);
this.id = null;
releaseReader();
// Unregister replicator metric set
if (this.nodeMetrics.isEnabled()) {
Expand All @@ -1058,6 +1057,7 @@ void destroy() {
this.state = State.Destroyed;
notifyReplicatorStatusListener((Replicator) savedId.getData(), ReplicatorEvent.DESTROYED);
savedId.unlockAndDestroy();
this.id = null;
}

private void releaseReader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.alipay.sofa.jraft.util.LogExceptionHandler;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.SegmentList;
import com.alipay.sofa.jraft.util.ThreadHelper;
import com.alipay.sofa.jraft.util.Utils;
import com.lmax.disruptor.EventFactory;
Expand Down Expand Up @@ -91,7 +92,7 @@ public class LogManagerImpl implements LogManager {
private LogId diskId = new LogId(0, 0);
private LogId appliedId = new LogId(0, 0);
// TODO use a lock-free concurrent list instead?
private ArrayDeque<LogEntry> logsInMemory = new ArrayDeque<>();
private final SegmentList<LogEntry> logsInMemory = new SegmentList<>();
private volatile long firstLogIndex;
private volatile long lastLogIndex;
private volatile LogId lastSnapshotId = new LogId(0, 0);
Expand Down Expand Up @@ -253,16 +254,8 @@ public void shutdown() {
private void clearMemoryLogs(final LogId id) {
this.writeLock.lock();
try {
int index = 0;
for (final int size = this.logsInMemory.size(); index < size; index++) {
final LogEntry entry = this.logsInMemory.get(index);
if (entry.getId().compareTo(id) > 0) {
break;
}
}
if (index > 0) {
this.logsInMemory.removeRange(0, index);
}

this.logsInMemory.removeFromFirstWhen(entry -> entry.getId().compareTo(id) <= 0);
} finally {
this.writeLock.unlock();
}
Expand Down Expand Up @@ -698,7 +691,8 @@ public void clearBufferedLogs() {
private String descLogsInMemory() {
final StringBuilder sb = new StringBuilder();
boolean wasFirst = true;
for (final LogEntry logEntry : this.logsInMemory) {
for (int i = 0; i < this.logsInMemory.size(); i++) {
LogEntry logEntry = this.logsInMemory.get(i);
if (!wasFirst) {
sb.append(",");
} else {
Expand Down Expand Up @@ -933,20 +927,12 @@ public void run(final Status status) {
}

private boolean truncatePrefix(final long firstIndexKept) {
int index = 0;
for (final int size = this.logsInMemory.size(); index < size; index++) {
final LogEntry entry = this.logsInMemory.get(index);
if (entry.getId().getIndex() >= firstIndexKept) {
break;
}
}
if (index > 0) {
this.logsInMemory.removeRange(0, index);
}

this.logsInMemory.removeFromFirstWhen(entry -> entry.getId().getIndex() < firstIndexKept);

// TODO maybe it's fine here
Requires.requireTrue(firstIndexKept >= this.firstLogIndex,
"Try to truncate logs before %d, but the firstLogIndex is %d", firstIndexKept, this.firstLogIndex);
"Try to truncate logs before %d, but the firstLogIndex is %d", firstIndexKept, this.firstLogIndex);

this.firstLogIndex = firstIndexKept;
if (firstIndexKept > this.lastLogIndex) {
Expand All @@ -963,7 +949,7 @@ private boolean truncatePrefix(final long firstIndexKept) {
private boolean reset(final long nextLogIndex) {
this.writeLock.lock();
try {
this.logsInMemory = new ArrayDeque<>();
this.logsInMemory.clear();
this.firstLogIndex = nextLogIndex;
this.lastLogIndex = nextLogIndex - 1;
this.configManager.truncatePrefix(this.firstLogIndex);
Expand All @@ -982,14 +968,9 @@ private void unsafeTruncateSuffix(final long lastIndexKept) {
lastIndexKept);
return;
}
while (!this.logsInMemory.isEmpty()) {
final LogEntry entry = this.logsInMemory.peekLast();
if (entry.getId().getIndex() > lastIndexKept) {
this.logsInMemory.pollLast();
} else {
break;
}
}

this.logsInMemory.removeFromLastWhen(entry -> entry.getId().getIndex() > lastIndexKept);

this.lastLogIndex = lastIndexKept;
final long lastTermKept = unsafeGetTerm(lastIndexKept);
Requires.requireTrue(this.lastLogIndex == 0 || lastTermKept != 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ public abstract class RepeatedTimer implements Describer {
private Timeout timeout;
private boolean stopped;
private volatile boolean running;
private boolean destroyed;
private boolean invoking;
private volatile boolean destroyed;
private volatile boolean invoking;
private volatile int timeoutMs;
private final String name;

public int getTimeoutMs() {
return this.timeoutMs;
}

public RepeatedTimer(String name, int timeoutMs) {
public RepeatedTimer(final String name, final int timeoutMs) {
this(name, timeoutMs, new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048));
}

public RepeatedTimer(String name, int timeoutMs, Timer timer) {
public RepeatedTimer(final String name, final int timeoutMs, final Timer timer) {
super();
this.name = name;
this.timeoutMs = timeoutMs;
Expand All @@ -81,12 +81,7 @@ protected int adjustTimeout(final int timeoutMs) {
}

public void run() {
this.lock.lock();
try {
this.invoking = true;
} finally {
this.lock.unlock();
}
this.invoking = true;
try {
onTrigger();
} catch (final Throwable t) {
Expand Down Expand Up @@ -157,7 +152,7 @@ public void start() {
}

private void schedule() {
if(this.timeout != null) {
if (this.timeout != null) {
this.timeout.cancel();
}
final TimerTask timerTask = timeout -> {
Expand Down Expand Up @@ -216,8 +211,6 @@ public void destroy() {
if (!this.running) {
invokeDestroyed = true;
}
// Timer#stop is idempotent
this.timer.stop();
if (this.stopped) {
return;
}
Expand All @@ -231,6 +224,7 @@ public void destroy() {
}
} finally {
this.lock.unlock();
this.timer.stop();
if (invokeDestroyed) {
onDestroy();
}
Expand Down Expand Up @@ -272,8 +266,8 @@ public void describe(final Printer out) {

@Override
public String toString() {
return "RepeatedTimer{" + "timeout=" + timeout + ", stopped=" + stopped + ", running=" + running
+ ", destroyed=" + destroyed + ", invoking=" + invoking + ", timeoutMs=" + timeoutMs + ", name='" + name
+ '\'' + '}';
return "RepeatedTimer{" + "timeout=" + this.timeout + ", stopped=" + this.stopped + ", running=" + this.running
+ ", destroyed=" + this.destroyed + ", invoking=" + this.invoking + ", timeoutMs=" + this.timeoutMs
+ ", name='" + this.name + '\'' + '}';
}
}
Loading