Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(bugfix) fix rheakv state machine bug #137

Merged
merged 18 commits into from
May 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void setErrorAndRollback(final long ntail, final Status st) {
this.currEntry = null;
getOrCreateError().setType(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE);
getOrCreateError().getStatus().setError(RaftError.ESTATEMACHINE,
"StateMachine meet critical error when applying one or more tasks since index=%d, %s", this.currentIndex,
"StateMachine meet critical error when applying one or more tasks since index=%d, %s", this.currentIndex,
st != null ? st.toString() : "none");

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void testRunTheRestClosureWithError() throws Exception {
final Status s = mc.s;
Assert.assertEquals(RaftError.ESTATEMACHINE.getNumber(), s.getCode());
assertEquals(
"StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]",
"StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]",
s.getErrorMsg());
}
}
Expand All @@ -129,7 +129,7 @@ public void testSetErrorAndRollback() {
Assert.assertEquals(RaftError.ESTATEMACHINE.getNumber(), iter.getError().getStatus().getCode());
Assert
.assertEquals(
"StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]",
"StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]",
iter.getError().getStatus().getErrorMsg());
assertEquals(6, iter.getIndex());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testSetErrorAndRollback() {
Assert.assertEquals(RaftError.ESTATEMACHINE.getNumber(), iterImpl.getError().getStatus().getCode());
Assert
.assertEquals(
"StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]",
"StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]",
iterImpl.getError().getStatus().getErrorMsg());
assertEquals(6, iter.getIndex());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,8 @@ public void run(final Status status) {

private static void setFailure(final BaseRequest request, final BaseResponse<?> response, final Status status,
final Errors error) {
response.setError(error);
response.setError(error == null ? Errors.STORAGE_ERROR : error);
response.setErrorMsg(status.toString());
LOG.error("Failed to handle: {}, status: {}, error: {}.", request, status, error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public Status rangeSplit(final long regionId, final long newRegionId, final Stri
if (response.isSuccess()) {
return Status.OK();
}
return new Status(-1, "fail to range split on region %d, error: %s", regionId, String.valueOf(response
.getError()));
return new Status(-1, "Fail to range split on region %d, error: %s", regionId, response);
} catch (final Exception e) {
LOG.error("Fail to range split on exception: {}.", StackTraceUtil.stackTrace(e));
return new Status(-1, "fail to range split on region %d", regionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class BaseResponse<T> implements Serializable {
private static final long serialVersionUID = 8411573936817037697L;

private Errors error = Errors.NONE;
private String errorMsg;
private long regionId;
private RegionEpoch regionEpoch;
private T value;
Expand All @@ -47,6 +48,14 @@ public void setError(Errors error) {
this.error = error;
}

public String getErrorMsg() {
return errorMsg;
}

public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}

public long getRegionId() {
return regionId;
}
Expand All @@ -73,7 +82,7 @@ public void setValue(T value) {

@Override
public String toString() {
return "BaseResponse{" + "error=" + error + ", regionId=" + regionId + ", regionEpoch=" + regionEpoch
+ ", value=" + value + '}';
return "BaseResponse{" + "error=" + error + ", errorMsg='" + errorMsg + '\'' + ", regionId=" + regionId
+ ", regionEpoch=" + regionEpoch + ", value=" + value + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rhea.errors.Errors;
import com.alipay.sofa.jraft.rhea.errors.StorageException;
import com.alipay.sofa.jraft.rhea.metrics.KVMetrics;
import com.alipay.sofa.jraft.rhea.util.StackTraceUtil;
import com.codahale.metrics.Timer;
Expand Down Expand Up @@ -76,7 +77,7 @@ public void execute(final NodeExecutor nodeExecutor, final boolean isLeader, fin
if (nodeExecutor != null) {
nodeExecutor.execute(new Status(RaftError.EIO, "Fail to [EXECUTE]"), isLeader);
}
setFailure(closure, "Fail to [EXECUTE]");
setCriticalError(closure, "Fail to [EXECUTE]", e);
} finally {
timeCtx.stop();
}
Expand Down Expand Up @@ -110,6 +111,13 @@ static Timer.Context getTimeContext(final String opName) {
return KVMetrics.timer(DB_TIMER, opName).time();
}

/**
* Sets success, if current node is a leader, reply to
* client success with result data response.
*
* @param closure callback
* @param data result data to reply to client
*/
static void setSuccess(final KVStoreClosure closure, final Object data) {
if (closure != null) {
// closure is null on follower node
Expand All @@ -118,11 +126,63 @@ static void setSuccess(final KVStoreClosure closure, final Object data) {
}
}

/**
* Sets failure, if current node is a leader, reply to
* client failure response.
*
* @param closure callback
* @param message error message
*/
static void setFailure(final KVStoreClosure closure, final String message) {
if (closure != null) {
// closure is null on follower node
closure.setError(Errors.STORAGE_ERROR);
closure.run(new Status(RaftError.EIO, message));
}
}

/**
* Sets critical error and halt the state machine.
*
* If current node is a leader, first reply to client
* failure response.
*
* @param closure callback
* @param message error message
* @param error critical error
*/
static void setCriticalError(final KVStoreClosure closure, final String message, final Throwable error) {
setClosureError(closure);
// Will call closure#run in FSMCaller
if (error != null) {
throw new StorageException(message, error);
}
}

/**
* Sets critical error and halt the state machine.
*
* If current node is a leader, first reply to client
* failure response.
*
* @param closures callback list
* @param message error message
* @param error critical error
*/
static void setCriticalError(final List<KVStoreClosure> closures, final String message, final Throwable error) {
for (final KVStoreClosure closure : closures) {
// Will call closure#run in FSMCaller
setClosureError(closure);
}
if (error != null) {
throw new StorageException(message, error);
}
}

static void setClosureError(final KVStoreClosure closure) {
if (closure != null) {
// closure is null on follower node
closure.setError(Errors.STORAGE_ERROR);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.alipay.sofa.jraft.rhea.serialization.Serializers;
import com.alipay.sofa.jraft.rhea.util.Pair;
import com.alipay.sofa.jraft.rhea.util.RecycleUtil;
import com.alipay.sofa.jraft.rhea.util.StackTraceUtil;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.BytesUtil;
Expand Down Expand Up @@ -80,62 +81,74 @@ public KVStoreStateMachine(Region region, StoreEngine storeEngine) {

@Override
public void onApply(final Iterator it) {
int stCount = 0;
KVStateOutputList kvStates = KVStateOutputList.newInstance();
while (it.hasNext()) {
KVOperation kvOp;
final KVClosureAdapter done = (KVClosureAdapter) it.done();
if (done != null) {
kvOp = done.getOperation();
} else {
final ByteBuffer buf = it.getData();
try {
if (buf.hasArray()) {
kvOp = this.serializer.readObject(buf.array(), KVOperation.class);
} else {
kvOp = this.serializer.readObject(buf, KVOperation.class);
int index = 0;
int applied = 0;
try {
KVStateOutputList kvStates = KVStateOutputList.newInstance();
while (it.hasNext()) {
KVOperation kvOp;
final KVClosureAdapter done = (KVClosureAdapter) it.done();
if (done != null) {
kvOp = done.getOperation();
} else {
final ByteBuffer buf = it.getData();
try {
if (buf.hasArray()) {
kvOp = this.serializer.readObject(buf.array(), KVOperation.class);
} else {
kvOp = this.serializer.readObject(buf, KVOperation.class);
}
} catch (final Throwable t) {
++index;
throw new StoreCodecException("Decode operation error", t);
}
} catch (final Throwable t) {
throw new StoreCodecException("Decode operation error", t);
}
final KVState first = kvStates.getFirstElement();
if (first != null && !first.isSameOp(kvOp)) {
applied += batchApplyAndRecycle(first.getOpByte(), kvStates);
kvStates = KVStateOutputList.newInstance();
}
kvStates.add(KVState.of(kvOp, done));
++index;
it.next();
}
final KVState first = kvStates.getFirstElement();
if (first != null && !first.isSameOp(kvOp)) {
batchApplyAndRecycle(first.getOpByte(), kvStates);
kvStates = KVStateOutputList.newInstance();
if (!kvStates.isEmpty()) {
final KVState first = kvStates.getFirstElement();
assert first != null;
applied += batchApplyAndRecycle(first.getOpByte(), kvStates);
}
kvStates.add(KVState.of(kvOp, done));
++stCount;
it.next();
}
if (!kvStates.isEmpty()) {
final KVState first = kvStates.getFirstElement();
assert first != null;
batchApplyAndRecycle(first.getOpByte(), kvStates);
} catch (final Throwable t) {
LOG.error("StateMachine meet critical error: {}.", StackTraceUtil.stackTrace(t));
it.setErrorAndRollback(index - applied, new Status(RaftError.ESTATEMACHINE,
"StateMachine meet critical error: %s.", t.getMessage()));
} finally {
// metrics: qps
this.applyMeter.mark(applied);
}

// metrics: qps
this.applyMeter.mark(stCount);
}

private void batchApplyAndRecycle(final byte opByte, final KVStateOutputList kvStates) {
private int batchApplyAndRecycle(final byte opByte, final KVStateOutputList kvStates) {
try {
if (kvStates.isEmpty()) {
return;
final int size = kvStates.size();

if (size == 0) {
return 0;
}

if (!KVOperation.isValidOp(opByte)) {
throw new IllegalKVOperationException("Unknown operation: " + opByte);
}

// metrics: op qps
final Meter opApplyMeter = KVMetrics.meter(STATE_MACHINE_APPLY_QPS, String.valueOf(this.region.getId()),
KVOperation.opName(opByte));
final int size = kvStates.size();
opApplyMeter.mark(size);
this.batchWriteHistogram.update(size);

// do batch apply
batchApply(opByte, kvStates);

return size;
} finally {
RecycleUtil.recycle(kvStates);
}
Expand Down
Loading