diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java index 0891a90b8..a272b2fc0 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java @@ -147,7 +147,7 @@ public void setErrorAndRollback(final long ntail, final Status st) { this.currEntry = null; getOrCreateError().setType(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE); getOrCreateError().getStatus().setError(RaftError.ESTATEMACHINE, - "StateMachine meet critical error when applying one or more tasks since index=%d, %s", this.currentIndex, + "StateMachine meet critical error when applying one or more tasks since index=%d, %s", this.currentIndex, st != null ? st.toString() : "none"); } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java index a61a2d9a6..2de7e32e4 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java @@ -113,7 +113,7 @@ public void testRunTheRestClosureWithError() throws Exception { final Status s = mc.s; Assert.assertEquals(RaftError.ESTATEMACHINE.getNumber(), s.getCode()); assertEquals( - "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", + "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", s.getErrorMsg()); } } @@ -129,7 +129,7 @@ public void testSetErrorAndRollback() { Assert.assertEquals(RaftError.ESTATEMACHINE.getNumber(), iter.getError().getStatus().getCode()); Assert .assertEquals( - "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", + "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", iter.getError().getStatus().getErrorMsg()); assertEquals(6, iter.getIndex()); } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorTest.java index d4061cacb..a7221931b 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorTest.java @@ -107,7 +107,7 @@ public void testSetErrorAndRollback() { Assert.assertEquals(RaftError.ESTATEMACHINE.getNumber(), iterImpl.getError().getStatus().getCode()); Assert .assertEquals( - "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", + "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", iterImpl.getError().getStatus().getErrorMsg()); assertEquals(6, iter.getIndex()); } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java index 37d073bf6..8ae41fcfc 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java @@ -565,7 +565,8 @@ public void run(final Status status) { private static void setFailure(final BaseRequest request, final BaseResponse response, final Status status, final Errors error) { - response.setError(error); + response.setError(error == null ? Errors.STORAGE_ERROR : error); + response.setErrorMsg(status.toString()); LOG.error("Failed to handle: {}, status: {}, error: {}.", request, status, error); } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java index 6a4e36701..ee8a76619 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java @@ -84,8 +84,7 @@ public Status rangeSplit(final long regionId, final long newRegionId, final Stri if (response.isSuccess()) { return Status.OK(); } - return new Status(-1, "fail to range split on region %d, error: %s", regionId, String.valueOf(response - .getError())); + return new Status(-1, "Fail to range split on region %d, error: %s", regionId, response); } catch (final Exception e) { LOG.error("Fail to range split on exception: {}.", StackTraceUtil.stackTrace(e)); return new Status(-1, "fail to range split on region %d", regionId); diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/BaseResponse.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/BaseResponse.java index a50d96f9d..a568d5b7f 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/BaseResponse.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/BaseResponse.java @@ -31,6 +31,7 @@ public class BaseResponse implements Serializable { private static final long serialVersionUID = 8411573936817037697L; private Errors error = Errors.NONE; + private String errorMsg; private long regionId; private RegionEpoch regionEpoch; private T value; @@ -47,6 +48,14 @@ public void setError(Errors error) { this.error = error; } + public String getErrorMsg() { + return errorMsg; + } + + public void setErrorMsg(String errorMsg) { + this.errorMsg = errorMsg; + } + public long getRegionId() { return regionId; } @@ -73,7 +82,7 @@ public void setValue(T value) { @Override public String toString() { - return "BaseResponse{" + "error=" + error + ", regionId=" + regionId + ", regionEpoch=" + regionEpoch - + ", value=" + value + '}'; + return "BaseResponse{" + "error=" + error + ", errorMsg='" + errorMsg + '\'' + ", regionId=" + regionId + + ", regionEpoch=" + regionEpoch + ", value=" + value + '}'; } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java index 1ccd5b8da..f46f8791b 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java @@ -25,6 +25,7 @@ import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.rhea.errors.Errors; +import com.alipay.sofa.jraft.rhea.errors.StorageException; import com.alipay.sofa.jraft.rhea.metrics.KVMetrics; import com.alipay.sofa.jraft.rhea.util.StackTraceUtil; import com.codahale.metrics.Timer; @@ -76,7 +77,7 @@ public void execute(final NodeExecutor nodeExecutor, final boolean isLeader, fin if (nodeExecutor != null) { nodeExecutor.execute(new Status(RaftError.EIO, "Fail to [EXECUTE]"), isLeader); } - setFailure(closure, "Fail to [EXECUTE]"); + setCriticalError(closure, "Fail to [EXECUTE]", e); } finally { timeCtx.stop(); } @@ -110,6 +111,13 @@ static Timer.Context getTimeContext(final String opName) { return KVMetrics.timer(DB_TIMER, opName).time(); } + /** + * Sets success, if current node is a leader, reply to + * client success with result data response. + * + * @param closure callback + * @param data result data to reply to client + */ static void setSuccess(final KVStoreClosure closure, final Object data) { if (closure != null) { // closure is null on follower node @@ -118,6 +126,13 @@ static void setSuccess(final KVStoreClosure closure, final Object data) { } } + /** + * Sets failure, if current node is a leader, reply to + * client failure response. + * + * @param closure callback + * @param message error message + */ static void setFailure(final KVStoreClosure closure, final String message) { if (closure != null) { // closure is null on follower node @@ -125,4 +140,49 @@ static void setFailure(final KVStoreClosure closure, final String message) { closure.run(new Status(RaftError.EIO, message)); } } + + /** + * Sets critical error and halt the state machine. + * + * If current node is a leader, first reply to client + * failure response. + * + * @param closure callback + * @param message error message + * @param error critical error + */ + static void setCriticalError(final KVStoreClosure closure, final String message, final Throwable error) { + setClosureError(closure); + // Will call closure#run in FSMCaller + if (error != null) { + throw new StorageException(message, error); + } + } + + /** + * Sets critical error and halt the state machine. + * + * If current node is a leader, first reply to client + * failure response. + * + * @param closures callback list + * @param message error message + * @param error critical error + */ + static void setCriticalError(final List closures, final String message, final Throwable error) { + for (final KVStoreClosure closure : closures) { + // Will call closure#run in FSMCaller + setClosureError(closure); + } + if (error != null) { + throw new StorageException(message, error); + } + } + + static void setClosureError(final KVStoreClosure closure) { + if (closure != null) { + // closure is null on follower node + closure.setError(Errors.STORAGE_ERROR); + } + } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java index e60fc9784..339327535 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java @@ -40,6 +40,7 @@ import com.alipay.sofa.jraft.rhea.serialization.Serializers; import com.alipay.sofa.jraft.rhea.util.Pair; import com.alipay.sofa.jraft.rhea.util.RecycleUtil; +import com.alipay.sofa.jraft.rhea.util.StackTraceUtil; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.alipay.sofa.jraft.util.BytesUtil; @@ -80,49 +81,60 @@ public KVStoreStateMachine(Region region, StoreEngine storeEngine) { @Override public void onApply(final Iterator it) { - int stCount = 0; - KVStateOutputList kvStates = KVStateOutputList.newInstance(); - while (it.hasNext()) { - KVOperation kvOp; - final KVClosureAdapter done = (KVClosureAdapter) it.done(); - if (done != null) { - kvOp = done.getOperation(); - } else { - final ByteBuffer buf = it.getData(); - try { - if (buf.hasArray()) { - kvOp = this.serializer.readObject(buf.array(), KVOperation.class); - } else { - kvOp = this.serializer.readObject(buf, KVOperation.class); + int index = 0; + int applied = 0; + try { + KVStateOutputList kvStates = KVStateOutputList.newInstance(); + while (it.hasNext()) { + KVOperation kvOp; + final KVClosureAdapter done = (KVClosureAdapter) it.done(); + if (done != null) { + kvOp = done.getOperation(); + } else { + final ByteBuffer buf = it.getData(); + try { + if (buf.hasArray()) { + kvOp = this.serializer.readObject(buf.array(), KVOperation.class); + } else { + kvOp = this.serializer.readObject(buf, KVOperation.class); + } + } catch (final Throwable t) { + ++index; + throw new StoreCodecException("Decode operation error", t); } - } catch (final Throwable t) { - throw new StoreCodecException("Decode operation error", t); } + final KVState first = kvStates.getFirstElement(); + if (first != null && !first.isSameOp(kvOp)) { + applied += batchApplyAndRecycle(first.getOpByte(), kvStates); + kvStates = KVStateOutputList.newInstance(); + } + kvStates.add(KVState.of(kvOp, done)); + ++index; + it.next(); } - final KVState first = kvStates.getFirstElement(); - if (first != null && !first.isSameOp(kvOp)) { - batchApplyAndRecycle(first.getOpByte(), kvStates); - kvStates = KVStateOutputList.newInstance(); + if (!kvStates.isEmpty()) { + final KVState first = kvStates.getFirstElement(); + assert first != null; + applied += batchApplyAndRecycle(first.getOpByte(), kvStates); } - kvStates.add(KVState.of(kvOp, done)); - ++stCount; - it.next(); - } - if (!kvStates.isEmpty()) { - final KVState first = kvStates.getFirstElement(); - assert first != null; - batchApplyAndRecycle(first.getOpByte(), kvStates); + } catch (final Throwable t) { + LOG.error("StateMachine meet critical error: {}.", StackTraceUtil.stackTrace(t)); + it.setErrorAndRollback(index - applied, new Status(RaftError.ESTATEMACHINE, + "StateMachine meet critical error: %s.", t.getMessage())); + } finally { + // metrics: qps + this.applyMeter.mark(applied); } - - // metrics: qps - this.applyMeter.mark(stCount); } - private void batchApplyAndRecycle(final byte opByte, final KVStateOutputList kvStates) { + private int batchApplyAndRecycle(final byte opByte, final KVStateOutputList kvStates) { try { - if (kvStates.isEmpty()) { - return; + final int size = kvStates.size(); + + if (size == 0) { + return 0; } + if (!KVOperation.isValidOp(opByte)) { throw new IllegalKVOperationException("Unknown operation: " + opByte); } @@ -130,12 +142,13 @@ private void batchApplyAndRecycle(final byte opByte, final KVStateOutputList kvS // metrics: op qps final Meter opApplyMeter = KVMetrics.meter(STATE_MACHINE_APPLY_QPS, String.valueOf(this.region.getId()), KVOperation.opName(opByte)); - final int size = kvStates.size(); opApplyMeter.mark(size); this.batchWriteHistogram.update(size); // do batch apply batchApply(opByte, kvStates); + + return size; } finally { RecycleUtil.recycle(kvStates); } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java index 2d043f5cb..d26d85f93 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java @@ -179,7 +179,7 @@ public void getSequence(final byte[] seqKey, final int step, final KVStoreClosur } catch (final Exception e) { LOG.error("Fail to [GET_SEQUENCE], [key = {}, step = {}], {}.", BytesUtil.toHex(seqKey), step, StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [GET_SEQUENCE]"); + setCriticalError(closure, "Fail to [GET_SEQUENCE]", e); } finally { timeCtx.stop(); } @@ -194,7 +194,7 @@ public void resetSequence(final byte[] seqKey, final KVStoreClosure closure) { } catch (final Exception e) { LOG.error("Fail to [RESET_SEQUENCE], [key = {}], {}.", BytesUtil.toHex(seqKey), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [RESET_SEQUENCE]"); + setCriticalError(closure, "Fail to [RESET_SEQUENCE]", e); } finally { timeCtx.stop(); } @@ -209,7 +209,7 @@ public void put(final byte[] key, final byte[] value, final KVStoreClosure closu } catch (final Exception e) { LOG.error("Fail to [PUT], [{}, {}], {}.", BytesUtil.toHex(key), BytesUtil.toHex(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT]"); + setCriticalError(closure, "Fail to [PUT]", e); } finally { timeCtx.stop(); } @@ -224,7 +224,7 @@ public void getAndPut(final byte[] key, final byte[] value, final KVStoreClosure } catch (final Exception e) { LOG.error("Fail to [GET_PUT], [{}, {}], {}.", BytesUtil.toHex(key), BytesUtil.toHex(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [GET_PUT]"); + setCriticalError(closure, "Fail to [GET_PUT]", e); } finally { timeCtx.stop(); } @@ -249,7 +249,7 @@ public void merge(final byte[] key, final byte[] value, final KVStoreClosure clo } catch (final Exception e) { LOG.error("Fail to [MERGE], [{}, {}], {}.", BytesUtil.toHex(key), BytesUtil.toHex(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [MERGE]"); + setCriticalError(closure, "Fail to [MERGE]", e); } finally { timeCtx.stop(); } @@ -265,7 +265,7 @@ public void put(final List entries, final KVStoreClosure closure) { setSuccess(closure, Boolean.TRUE); } catch (final Exception e) { LOG.error("Failed to [PUT_LIST], [size = {}], {}.", entries.size(), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT_LIST]"); + setCriticalError(closure, "Fail to [PUT_LIST]", e); } finally { timeCtx.stop(); } @@ -280,7 +280,7 @@ public void putIfAbsent(final byte[] key, final byte[] value, final KVStoreClosu } catch (final Exception e) { LOG.error("Fail to [PUT_IF_ABSENT], [{}, {}], {}.", BytesUtil.toHex(key), BytesUtil.toHex(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT_IF_ABSENT]"); + setCriticalError(closure, "Fail to [PUT_IF_ABSENT]", e); } finally { timeCtx.stop(); } @@ -437,7 +437,7 @@ public void tryLockWith(final byte[] key, final byte[] fencingKey, final boolean setSuccess(closure, owner); } catch (final Exception e) { LOG.error("Fail to [TRY_LOCK], [{}, {}], {}.", BytesUtil.toHex(key), acquirer, StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [TRY_LOCK]"); + setCriticalError(closure, "Fail to [TRY_LOCK]", e); } finally { timeCtx.stop(); } @@ -511,7 +511,7 @@ public void releaseLockWith(final byte[] key, final DistributedLock.Acquirer acq setSuccess(closure, owner); } catch (final Exception e) { LOG.error("Fail to [RELEASE_LOCK], [{}], {}.", BytesUtil.toHex(key), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [RELEASE_LOCK]"); + setCriticalError(closure, "Fail to [RELEASE_LOCK]", e); } finally { timeCtx.stop(); } @@ -543,7 +543,7 @@ public void delete(final byte[] key, final KVStoreClosure closure) { setSuccess(closure, Boolean.TRUE); } catch (final Exception e) { LOG.error("Fail to [DELETE], [{}], {}.", BytesUtil.toHex(key), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [DELETE]"); + setCriticalError(closure, "Fail to [DELETE]", e); } finally { timeCtx.stop(); } @@ -561,7 +561,7 @@ public void deleteRange(final byte[] startKey, final byte[] endKey, final KVStor } catch (final Exception e) { LOG.error("Fail to [DELETE_RANGE], ['[{}, {})'], {}.", BytesUtil.toHex(startKey), BytesUtil.toHex(endKey), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [DELETE_RANGE]"); + setCriticalError(closure, "Fail to [DELETE_RANGE]", e); } finally { timeCtx.stop(); } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java index be3aa089d..fa2c5edd8 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java @@ -345,7 +345,7 @@ public void getSequence(final byte[] seqKey, final int step, final KVStoreClosur } catch (final Exception e) { LOG.error("Fail to [GET_SEQUENCE], [key = {}, step = {}], {}.", BytesUtil.toHex(seqKey), step, StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [GET_SEQUENCE]"); + setCriticalError(closure, "Fail to [GET_SEQUENCE]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -363,7 +363,7 @@ public void resetSequence(final byte[] seqKey, final KVStoreClosure closure) { } catch (final Exception e) { LOG.error("Fail to [RESET_SEQUENCE], [key = {}], {}.", BytesUtil.toHex(seqKey), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [RESET_SEQUENCE]"); + setCriticalError(closure, "Fail to [RESET_SEQUENCE]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -391,11 +391,9 @@ public void batchResetSequence(final KVStateOutputList kvStates) { setSuccess(kvState.getDone(), Boolean.TRUE); } } catch (final Exception e) { - LOG.error("Failed to [BATCH_RESET_SEQUENCE], [size = {}], {}.", - segment.size(), StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_RESET_SEQUENCE]"); - } + LOG.error("Failed to [BATCH_RESET_SEQUENCE], [size = {}], {}.", segment.size(), + StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_RESET_SEQUENCE]", e); } return null; }); @@ -416,7 +414,7 @@ public void put(final byte[] key, final byte[] value, final KVStoreClosure closu } catch (final Exception e) { LOG.error("Fail to [PUT], [{}, {}], {}.", BytesUtil.toHex(key), BytesUtil.toHex(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT]"); + setCriticalError(closure, "Fail to [PUT]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -446,11 +444,8 @@ public void batchPut(final KVStateOutputList kvStates) { setSuccess(kvState.getDone(), Boolean.TRUE); } } catch (final Exception e) { - LOG.error("Failed to [BATCH_PUT], [size = {}] {}.", segment.size(), - StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_PUT]"); - } + LOG.error("Failed to [BATCH_PUT], [size = {}] {}.", segment.size(), StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_PUT]", e); } return null; }); @@ -472,7 +467,7 @@ public void getAndPut(final byte[] key, final byte[] value, final KVStoreClosure } catch (final Exception e) { LOG.error("Fail to [GET_PUT], [{}, {}], {}.", BytesUtil.toHex(key), BytesUtil.toHex(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [GET_PUT]"); + setCriticalError(closure, "Fail to [GET_PUT]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -508,10 +503,8 @@ public void batchGetAndPut(final KVStateOutputList kvStates) { } } catch (final Exception e) { LOG.error("Failed to [BATCH_GET_PUT], [size = {}] {}.", segment.size(), - StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_GET_PUT]"); - } + StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_GET_PUT]", e); } return null; }); @@ -532,7 +525,7 @@ public void merge(final byte[] key, final byte[] value, final KVStoreClosure clo } catch (final Exception e) { LOG.error("Fail to [MERGE], [{}, {}], {}.", BytesUtil.toHex(key), BytesUtil.toHex(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [MERGE]"); + setCriticalError(closure, "Fail to [MERGE]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -562,11 +555,8 @@ public void batchMerge(final KVStateOutputList kvStates) { setSuccess(kvState.getDone(), Boolean.TRUE); } } catch (final Exception e) { - LOG.error("Failed to [BATCH_MERGE], [size = {}] {}.", segment.size(), - StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_MERGE]"); - } + LOG.error("Failed to [BATCH_MERGE], [size = {}] {}.", segment.size(), StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_MERGE]", e); } return null; }); @@ -589,7 +579,7 @@ public void put(final List entries, final KVStoreClosure closure) { setSuccess(closure, Boolean.TRUE); } catch (final Exception e) { LOG.error("Failed to [PUT_LIST], [size = {}], {}.", entries.size(), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT_LIST]"); + setCriticalError(closure, "Fail to [PUT_LIST]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -610,7 +600,7 @@ public void putIfAbsent(final byte[] key, final byte[] value, final KVStoreClosu } catch (final Exception e) { LOG.error("Fail to [PUT_IF_ABSENT], [{}, {}], {}.", BytesUtil.toHex(key), BytesUtil.toHex(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT_IF_ABSENT]"); + setCriticalError(closure, "Fail to [PUT_IF_ABSENT]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -771,7 +761,7 @@ public void tryLockWith(final byte[] key, final byte[] fencingKey, final boolean setSuccess(closure, owner); } catch (final Exception e) { LOG.error("Fail to [TRY_LOCK], [{}, {}], {}.", BytesUtil.toHex(key), acquirer, StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [TRY_LOCK]"); + setCriticalError(closure, "Fail to [TRY_LOCK]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -850,7 +840,7 @@ public void releaseLockWith(final byte[] key, final DistributedLock.Acquirer acq setSuccess(closure, owner); } catch (final Exception e) { LOG.error("Fail to [RELEASE_LOCK], [{}], {}.", BytesUtil.toHex(key), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [RELEASE_LOCK]"); + setCriticalError(closure, "Fail to [RELEASE_LOCK]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -894,7 +884,7 @@ public void delete(final byte[] key, final KVStoreClosure closure) { setSuccess(closure, Boolean.TRUE); } catch (final Exception e) { LOG.error("Fail to [DELETE], [{}], {}.", BytesUtil.toHex(key), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [DELETE]"); + setCriticalError(closure, "Fail to [DELETE]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -922,11 +912,9 @@ public void batchDelete(final KVStateOutputList kvStates) { setSuccess(kvState.getDone(), Boolean.TRUE); } } catch (final Exception e) { - LOG.error("Failed to [BATCH_DELETE], [size = {}], {}.", - segment.size(), StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_DELETE]"); - } + LOG.error("Failed to [BATCH_DELETE], [size = {}], {}.", segment.size(), + StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_DELETE]", e); } return null; }); @@ -947,7 +935,7 @@ public void deleteRange(final byte[] startKey, final byte[] endKey, final KVStor } catch (final Exception e) { LOG.error("Fail to [DELETE_RANGE], ['[{}, {})'], {}.", BytesUtil.toHex(startKey), BytesUtil.toHex(endKey), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [DELETE_RANGE]"); + setCriticalError(closure, "Fail to [DELETE_RANGE]", e); } finally { readLock.unlock(); timeCtx.stop(); diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java new file mode 100644 index 000000000..4f90a8d0a --- /dev/null +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rhea.storage; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.alipay.sofa.jraft.Node; +import com.alipay.sofa.jraft.RaftGroupService; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rhea.StoreEngine; +import com.alipay.sofa.jraft.rhea.client.pd.FakePlacementDriverClient; +import com.alipay.sofa.jraft.rhea.metadata.Region; +import com.alipay.sofa.jraft.rhea.util.ThrowUtil; +import com.alipay.sofa.jraft.util.BytesUtil; +import com.alipay.sofa.jraft.util.Endpoint; + +import static com.alipay.sofa.jraft.core.State.STATE_ERROR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author jiachun.fjc + */ +public class KVStateMachineTest { + + private static final int APPLY_COUNT = 100; + private static final int SUCCESS_COUNT = 10; + + private RaftGroupService raftGroupService; + private RaftRawKVStore raftRawKVStore; + private File raftDataPath; + + @Before + public void setup() throws IOException, InterruptedException { + final Region region = new Region(); + region.setId(1); + final StoreEngine storeEngine = new MockStoreEngine(); + final KVStoreStateMachine fsm = new KVStoreStateMachine(region, storeEngine); + final NodeOptions nodeOpts = new NodeOptions(); + final Configuration conf = new Configuration(); + conf.addPeer(PeerId.parsePeer("127.0.0.1:8081")); + nodeOpts.setInitialConf(conf); + nodeOpts.setFsm(fsm); + + final String raftDataPath = "raft_st_test"; + this.raftDataPath = new File(raftDataPath); + if (this.raftDataPath.exists()) { + FileUtils.forceDelete(this.raftDataPath); + } + FileUtils.forceMkdir(this.raftDataPath); + + final Path logUri = Paths.get(raftDataPath, "log"); + nodeOpts.setLogUri(logUri.toString()); + + final Path meteUri = Paths.get(raftDataPath, "meta"); + nodeOpts.setRaftMetaUri(meteUri.toString()); + + final Path snapshotUri = Paths.get(raftDataPath, "snapshot"); + nodeOpts.setSnapshotUri(snapshotUri.toString()); + + final Endpoint serverAddress = new Endpoint("127.0.0.1", 8081); + final PeerId serverId = new PeerId(serverAddress, 0); + this.raftGroupService = new RaftGroupService("st_test", serverId, nodeOpts, null, true); + + final Node node = this.raftGroupService.start(false); + + for (int i = 0; i < 100; i++) { + if (node.isLeader()) { + break; + } + Thread.sleep(100); + } + + final RawKVStore rawKVStore = storeEngine.getRawKVStore(); + this.raftRawKVStore = new RaftRawKVStore(node, rawKVStore, null); + } + + @After + public void tearDown() throws IOException { + if (this.raftGroupService != null) { + this.raftGroupService.shutdown(); + try { + this.raftGroupService.join(); + } catch (final InterruptedException e) { + ThrowUtil.throwException(e); + } + } + if (this.raftDataPath.exists()) { + FileUtils.forceDelete(this.raftDataPath); + } + } + + @Test + public void failApplyTest() throws Exception { + final CountDownLatch latch = new CountDownLatch(APPLY_COUNT); + final List closures = new ArrayList<>(); + final BlockingQueue successQueue = new ArrayBlockingQueue<>(APPLY_COUNT); + final BlockingQueue failQueue = new ArrayBlockingQueue<>(APPLY_COUNT); + assertTrue(this.raftGroupService.getRaftNode().isLeader()); + for (int i = 0; i < SUCCESS_COUNT; i++) { + final KVStoreClosure c = new BaseKVStoreClosure() { + + @Override + public void run(Status status) { + latch.countDown(); + successQueue.add(status); + } + }; + closures.add(c); + } + for (int i = SUCCESS_COUNT; i < APPLY_COUNT; i++) { + final KVStoreClosure c = new BaseKVStoreClosure() { + + @Override + public void run(Status status) { + latch.countDown(); + failQueue.add(status); + } + }; + closures.add(c); + } + + for (int i = 0; i < SUCCESS_COUNT; i++) { + final byte[] bytes = BytesUtil.writeUtf8(String.valueOf(i)); + this.raftRawKVStore.put(bytes, bytes, closures.get(i)); + } + + for (int i = SUCCESS_COUNT; i < APPLY_COUNT; i++) { + final byte[] bytes = BytesUtil.writeUtf8(String.valueOf(i)); + this.raftRawKVStore.merge(bytes, bytes, closures.get(i)); + } + + latch.await(); + + final Node node = this.raftGroupService.getRaftNode(); + assertFalse(node.isLeader()); + final Field field = node.getClass().getDeclaredField("state"); + field.setAccessible(true); + assertEquals(field.get(node), STATE_ERROR); + assertEquals(SUCCESS_COUNT, successQueue.size()); + assertEquals(APPLY_COUNT - SUCCESS_COUNT, failQueue.size()); + while (true) { + final Status st = successQueue.poll(); + if (st == null) { + break; + } + assertTrue(st.isOk()); + } + + while (true) { + final Status st = failQueue.poll(); + if (st == null) { + break; + } + assertFalse(st.isOk()); + assertTrue(st.getRaftError() == RaftError.ESTATEMACHINE || st.getRaftError() == RaftError.EPERM); + } + } + + static class MockKVStore extends MemoryRawKVStore { + + private int putIndex = 0; + + @Override + public void put(byte[] key, byte[] value, KVStoreClosure closure) { + if (this.putIndex++ < SUCCESS_COUNT) { + if (closure != null) { + closure.setData(value); + closure.run(Status.OK()); + } + } else { + throw new RuntimeException("fail put test"); + } + } + + @Override + public void merge(byte[] key, byte[] value, KVStoreClosure closure) { + if (this.putIndex++ < SUCCESS_COUNT) { + if (closure != null) { + closure.setData(value); + closure.run(Status.OK()); + } + } else { + throw new RuntimeException("fail merge test"); + } + } + + @Override + void doSnapshotSave(MemoryKVStoreSnapshotFile snapshotFile, String snapshotPath, Region region) + throws Exception { + super.doSnapshotSave(snapshotFile, snapshotPath, region); + snapshotFile.writeToFile(snapshotPath, "putIndex", new PutIndex(this.putIndex)); + } + + @Override + void doSnapshotLoad(MemoryKVStoreSnapshotFile snapshotFile, String snapshotPath) throws Exception { + super.doSnapshotLoad(snapshotFile, snapshotPath); + final PutIndex p = snapshotFile.readFromFile(snapshotPath, "putIndex", PutIndex.class); + this.putIndex = p.data(); + } + + class PutIndex extends MemoryKVStoreSnapshotFile.Persistence { + + public PutIndex(Integer data) { + super(data); + } + } + } + + static class MockStoreEngine extends StoreEngine { + + private final MockKVStore mockKVStore = new MockKVStore(); + private final ExecutorService leaderStateTrigger = Executors.newSingleThreadExecutor(); + + public MockStoreEngine() { + super(new MockPlacementDriverClient()); + } + + @Override + public BatchRawKVStore getRawKVStore() { + return this.mockKVStore; + } + + @Override + public ExecutorService getLeaderStateTrigger() { + return this.leaderStateTrigger; + } + } + + static class MockPlacementDriverClient extends FakePlacementDriverClient { + + public MockPlacementDriverClient() { + super(1, "st_test"); + } + } +}