From fa55f9a9a505047af4b5c3fd21ee5598b66831f7 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Mon, 29 Apr 2019 02:33:14 +0800 Subject: [PATCH 01/16] (bugfix) fix rheakv state machine bug for #136 --- .../jraft/rhea/storage/BaseRawKVStore.java | 53 +++++++++++++++- .../rhea/storage/KVStoreStateMachine.java | 60 ++++++++++--------- .../jraft/rhea/storage/MemoryRawKVStore.java | 22 +++---- .../jraft/rhea/storage/RocksRawKVStore.java | 58 +++++++----------- 4 files changed, 119 insertions(+), 74 deletions(-) diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java index 1ccd5b8da..d848b7b7d 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java @@ -25,6 +25,7 @@ import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.rhea.errors.Errors; +import com.alipay.sofa.jraft.rhea.errors.StorageException; import com.alipay.sofa.jraft.rhea.metrics.KVMetrics; import com.alipay.sofa.jraft.rhea.util.StackTraceUtil; import com.codahale.metrics.Timer; @@ -76,7 +77,7 @@ public void execute(final NodeExecutor nodeExecutor, final boolean isLeader, fin if (nodeExecutor != null) { nodeExecutor.execute(new Status(RaftError.EIO, "Fail to [EXECUTE]"), isLeader); } - setFailure(closure, "Fail to [EXECUTE]"); + setCriticalError(closure, "Fail to [EXECUTE]", e); } finally { timeCtx.stop(); } @@ -110,6 +111,13 @@ static Timer.Context getTimeContext(final String opName) { return KVMetrics.timer(DB_TIMER, opName).time(); } + /** + * Sets success, if current node is a leader, reply to + * client success with result data response. + * + * @param closure callback + * @param data result data to reply to client + */ static void setSuccess(final KVStoreClosure closure, final Object data) { if (closure != null) { // closure is null on follower node @@ -118,6 +126,13 @@ static void setSuccess(final KVStoreClosure closure, final Object data) { } } + /** + * Sets failure, if current node is a leader, reply to + * client failure response. + * + * @param closure callback + * @param message error message + */ static void setFailure(final KVStoreClosure closure, final String message) { if (closure != null) { // closure is null on follower node @@ -125,4 +140,40 @@ static void setFailure(final KVStoreClosure closure, final String message) { closure.run(new Status(RaftError.EIO, message)); } } + + /** + * Sets critical error and halt the state machine. + * + * if current node is a leader, first reply to client + * failure response. + * + * @param closure callback + * @param message error message + * @param error critical error + */ + static void setCriticalError(final KVStoreClosure closure, final String message, final Throwable error) { + setFailure(closure, message); + if (error != null) { + throw new StorageException(message, error); + } + } + + /** + * Sets critical error and halt the state machine. + * + * if current node is a leader, first reply to client + * failure response. + * + * @param closures callback list + * @param message error message + * @param error critical error + */ + static void setCriticalError(final List closures, final String message, final Throwable error) { + for (final KVStoreClosure c : closures) { + setFailure(c, message); + } + if (error != null) { + throw new StorageException(message, error); + } + } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java index 3d9435804..10a2fe525 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java @@ -79,38 +79,44 @@ public KVStoreStateMachine(Region region, StoreEngine storeEngine) { @Override public void onApply(final Iterator it) { - int stCount = 0; - KVStateOutputList kvStates = KVStateOutputList.newInstance(); - while (it.hasNext()) { - KVOperation kvOp; - final KVClosureAdapter done = (KVClosureAdapter) it.done(); - if (done != null) { - kvOp = done.getOperation(); - } else { - final byte[] data = it.getData().array(); - try { - kvOp = this.serializer.readObject(data, KVOperation.class); - } catch (final Throwable t) { - throw new StoreCodecException("Decode operation error", t); + int index = 0; + try { + KVStateOutputList kvStates = KVStateOutputList.newInstance(); + while (it.hasNext()) { + KVOperation kvOp; + final KVClosureAdapter done = (KVClosureAdapter) it.done(); + if (done != null) { + kvOp = done.getOperation(); + } else { + final byte[] data = it.getData().array(); + try { + kvOp = this.serializer.readObject(data, KVOperation.class); + } catch (final Throwable t) { + ++index; + throw new StoreCodecException("Decode operation error", t); + } + } + final KVState first = kvStates.getFirstElement(); + if (first != null && !first.isSameOp(kvOp)) { + batchApplyAndRecycle(first.getOpByte(), kvStates); + kvStates = KVStateOutputList.newInstance(); } + kvStates.add(KVState.of(kvOp, done)); + ++index; + it.next(); } - final KVState first = kvStates.getFirstElement(); - if (first != null && !first.isSameOp(kvOp)) { + if (!kvStates.isEmpty()) { + final KVState first = kvStates.getFirstElement(); + assert first != null; batchApplyAndRecycle(first.getOpByte(), kvStates); - kvStates = KVStateOutputList.newInstance(); } - kvStates.add(KVState.of(kvOp, done)); - ++stCount; - it.next(); - } - if (!kvStates.isEmpty()) { - final KVState first = kvStates.getFirstElement(); - assert first != null; - batchApplyAndRecycle(first.getOpByte(), kvStates); + } catch (final Throwable t) { + it.setErrorAndRollback(index, + new Status(RaftError.EIO, "StateMachine meet critical error: %s.", t.getMessage())); + } finally { + // metrics: qps + this.applyMeter.mark(index); } - - // metrics: qps - this.applyMeter.mark(stCount); } private void batchApplyAndRecycle(final byte opByte, final KVStateOutputList kvStates) { diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java index 300d036e8..7645d34b6 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java @@ -180,7 +180,7 @@ public void getSequence(final byte[] seqKey, final int step, final KVStoreClosur } catch (final Exception e) { LOG.error("Fail to [GET_SEQUENCE], [key = {}, step = {}], {}.", Arrays.toString(seqKey), step, StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [GET_SEQUENCE]"); + setCriticalError(closure, "Fail to [GET_SEQUENCE]", e); } finally { timeCtx.stop(); } @@ -195,7 +195,7 @@ public void resetSequence(final byte[] seqKey, final KVStoreClosure closure) { } catch (final Exception e) { LOG.error("Fail to [RESET_SEQUENCE], [key = {}], {}.", Arrays.toString(seqKey), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [RESET_SEQUENCE]"); + setCriticalError(closure, "Fail to [RESET_SEQUENCE]", e); } finally { timeCtx.stop(); } @@ -210,7 +210,7 @@ public void put(final byte[] key, final byte[] value, final KVStoreClosure closu } catch (final Exception e) { LOG.error("Fail to [PUT], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT]"); + setCriticalError(closure, "Fail to [PUT]", e); } finally { timeCtx.stop(); } @@ -225,7 +225,7 @@ public void getAndPut(final byte[] key, final byte[] value, final KVStoreClosure } catch (final Exception e) { LOG.error("Fail to [GET_PUT], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [GET_PUT]"); + setCriticalError(closure, "Fail to [GET_PUT]", e); } finally { timeCtx.stop(); } @@ -250,7 +250,7 @@ public void merge(final byte[] key, final byte[] value, final KVStoreClosure clo } catch (final Exception e) { LOG.error("Fail to [MERGE], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [MERGE]"); + setCriticalError(closure, "Fail to [MERGE]", e); } finally { timeCtx.stop(); } @@ -266,7 +266,7 @@ public void put(final List entries, final KVStoreClosure closure) { setSuccess(closure, Boolean.TRUE); } catch (final Exception e) { LOG.error("Failed to [PUT_LIST], [size = {}], {}.", entries.size(), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT_LIST]"); + setCriticalError(closure, "Fail to [PUT_LIST]", e); } finally { timeCtx.stop(); } @@ -281,7 +281,7 @@ public void putIfAbsent(final byte[] key, final byte[] value, final KVStoreClosu } catch (final Exception e) { LOG.error("Fail to [PUT_IF_ABSENT], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT_IF_ABSENT]"); + setCriticalError(closure, "Fail to [PUT_IF_ABSENT]", e); } finally { timeCtx.stop(); } @@ -438,7 +438,7 @@ public void tryLockWith(final byte[] key, final byte[] fencingKey, final boolean setSuccess(closure, owner); } catch (final Exception e) { LOG.error("Fail to [TRY_LOCK], [{}, {}], {}.", Arrays.toString(key), acquirer, StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [TRY_LOCK]"); + setCriticalError(closure, "Fail to [TRY_LOCK]", e); } finally { timeCtx.stop(); } @@ -512,7 +512,7 @@ public void releaseLockWith(final byte[] key, final DistributedLock.Acquirer acq setSuccess(closure, owner); } catch (final Exception e) { LOG.error("Fail to [RELEASE_LOCK], [{}], {}.", Arrays.toString(key), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [RELEASE_LOCK]"); + setCriticalError(closure, "Fail to [RELEASE_LOCK]", e); } finally { timeCtx.stop(); } @@ -544,7 +544,7 @@ public void delete(final byte[] key, final KVStoreClosure closure) { setSuccess(closure, Boolean.TRUE); } catch (final Exception e) { LOG.error("Fail to [DELETE], [{}], {}.", Arrays.toString(key), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [DELETE]"); + setCriticalError(closure, "Fail to [DELETE]", e); } finally { timeCtx.stop(); } @@ -562,7 +562,7 @@ public void deleteRange(final byte[] startKey, final byte[] endKey, final KVStor } catch (final Exception e) { LOG.error("Fail to [DELETE_RANGE], ['[{}, {})'], {}.", Arrays.toString(startKey), Arrays.toString(endKey), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [DELETE_RANGE]"); + setCriticalError(closure, "Fail to [DELETE_RANGE]", e); } finally { timeCtx.stop(); } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java index e67093f0d..30f7a6c2b 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java @@ -346,7 +346,7 @@ public void getSequence(final byte[] seqKey, final int step, final KVStoreClosur } catch (final Exception e) { LOG.error("Fail to [GET_SEQUENCE], [key = {}, step = {}], {}.", Arrays.toString(seqKey), step, StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [GET_SEQUENCE]"); + setCriticalError(closure, "Fail to [GET_SEQUENCE]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -364,7 +364,7 @@ public void resetSequence(final byte[] seqKey, final KVStoreClosure closure) { } catch (final Exception e) { LOG.error("Fail to [RESET_SEQUENCE], [key = {}], {}.", Arrays.toString(seqKey), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [RESET_SEQUENCE]"); + setCriticalError(closure, "Fail to [RESET_SEQUENCE]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -392,11 +392,9 @@ public void batchResetSequence(final KVStateOutputList kvStates) { setSuccess(kvState.getDone(), Boolean.TRUE); } } catch (final Exception e) { - LOG.error("Failed to [BATCH_RESET_SEQUENCE], [size = {}], {}.", - segment.size(), StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_RESET_SEQUENCE]"); - } + LOG.error("Failed to [BATCH_RESET_SEQUENCE], [size = {}], {}.", segment.size(), + StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_RESET_SEQUENCE]", e); } return null; }); @@ -417,7 +415,7 @@ public void put(final byte[] key, final byte[] value, final KVStoreClosure closu } catch (final Exception e) { LOG.error("Fail to [PUT], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT]"); + setCriticalError(closure, "Fail to [PUT]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -447,11 +445,8 @@ public void batchPut(final KVStateOutputList kvStates) { setSuccess(kvState.getDone(), Boolean.TRUE); } } catch (final Exception e) { - LOG.error("Failed to [BATCH_PUT], [size = {}] {}.", segment.size(), - StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_PUT]"); - } + LOG.error("Failed to [BATCH_PUT], [size = {}] {}.", segment.size(), StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_PUT]", e); } return null; }); @@ -473,7 +468,7 @@ public void getAndPut(final byte[] key, final byte[] value, final KVStoreClosure } catch (final Exception e) { LOG.error("Fail to [GET_PUT], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [GET_PUT]"); + setCriticalError(closure, "Fail to [GET_PUT]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -509,10 +504,8 @@ public void batchGetAndPut(final KVStateOutputList kvStates) { } } catch (final Exception e) { LOG.error("Failed to [BATCH_GET_PUT], [size = {}] {}.", segment.size(), - StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_GET_PUT]"); - } + StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_GET_PUT]", e); } return null; }); @@ -533,7 +526,7 @@ public void merge(final byte[] key, final byte[] value, final KVStoreClosure clo } catch (final Exception e) { LOG.error("Fail to [MERGE], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [MERGE]"); + setCriticalError(closure, "Fail to [MERGE]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -563,11 +556,8 @@ public void batchMerge(final KVStateOutputList kvStates) { setSuccess(kvState.getDone(), Boolean.TRUE); } } catch (final Exception e) { - LOG.error("Failed to [BATCH_MERGE], [size = {}] {}.", segment.size(), - StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_MERGE]"); - } + LOG.error("Failed to [BATCH_MERGE], [size = {}] {}.", segment.size(), StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_MERGE]", e); } return null; }); @@ -590,7 +580,7 @@ public void put(final List entries, final KVStoreClosure closure) { setSuccess(closure, Boolean.TRUE); } catch (final Exception e) { LOG.error("Failed to [PUT_LIST], [size = {}], {}.", entries.size(), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT_LIST]"); + setCriticalError(closure, "Fail to [PUT_LIST]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -611,7 +601,7 @@ public void putIfAbsent(final byte[] key, final byte[] value, final KVStoreClosu } catch (final Exception e) { LOG.error("Fail to [PUT_IF_ABSENT], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT_IF_ABSENT]"); + setCriticalError(closure, "Fail to [PUT_IF_ABSENT]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -772,7 +762,7 @@ public void tryLockWith(final byte[] key, final byte[] fencingKey, final boolean setSuccess(closure, owner); } catch (final Exception e) { LOG.error("Fail to [TRY_LOCK], [{}, {}], {}.", Arrays.toString(key), acquirer, StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [TRY_LOCK]"); + setCriticalError(closure, "Fail to [TRY_LOCK]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -851,7 +841,7 @@ public void releaseLockWith(final byte[] key, final DistributedLock.Acquirer acq setSuccess(closure, owner); } catch (final Exception e) { LOG.error("Fail to [RELEASE_LOCK], [{}], {}.", Arrays.toString(key), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [RELEASE_LOCK]"); + setCriticalError(closure, "Fail to [RELEASE_LOCK]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -895,7 +885,7 @@ public void delete(final byte[] key, final KVStoreClosure closure) { setSuccess(closure, Boolean.TRUE); } catch (final Exception e) { LOG.error("Fail to [DELETE], [{}], {}.", Arrays.toString(key), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [DELETE]"); + setCriticalError(closure, "Fail to [DELETE]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -923,11 +913,9 @@ public void batchDelete(final KVStateOutputList kvStates) { setSuccess(kvState.getDone(), Boolean.TRUE); } } catch (final Exception e) { - LOG.error("Failed to [BATCH_DELETE], [size = {}], {}.", - segment.size(), StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_DELETE]"); - } + LOG.error("Failed to [BATCH_DELETE], [size = {}], {}.", segment.size(), + StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_DELETE]", e); } return null; }); @@ -948,7 +936,7 @@ public void deleteRange(final byte[] startKey, final byte[] endKey, final KVStor } catch (final Exception e) { LOG.error("Fail to [DELETE_RANGE], ['[{}, {})'], {}.", Arrays.toString(startKey), Arrays.toString(endKey), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [DELETE_RANGE]"); + setCriticalError(closure, "Fail to [DELETE_RANGE]", e); } finally { readLock.unlock(); timeCtx.stop(); From dc0a7a80d75f715f73d563a54e99f5c76be29664 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Mon, 29 Apr 2019 03:01:15 +0800 Subject: [PATCH 02/16] (fix) typo --- .../com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java index d848b7b7d..81726c8ad 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java @@ -144,7 +144,7 @@ static void setFailure(final KVStoreClosure closure, final String message) { /** * Sets critical error and halt the state machine. * - * if current node is a leader, first reply to client + * If current node is a leader, first reply to client * failure response. * * @param closure callback @@ -161,7 +161,7 @@ static void setCriticalError(final KVStoreClosure closure, final String message, /** * Sets critical error and halt the state machine. * - * if current node is a leader, first reply to client + * If current node is a leader, first reply to client * failure response. * * @param closures callback list From 0b053ddff7d43f3978fd1061894b900606344538 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Mon, 29 Apr 2019 04:18:42 +0800 Subject: [PATCH 03/16] (bugfix) fix rheakv state machine bug for #136 --- .../jraft/rhea/DefaultRegionKVService.java | 3 ++- .../rhea/client/DefaultRheaKVCliService.java | 3 +-- .../jraft/rhea/cmd/store/BaseResponse.java | 13 +++++++++-- .../jraft/rhea/storage/BaseRawKVStore.java | 15 +++++++++--- .../rhea/storage/KVStoreStateMachine.java | 23 +++++++++++-------- 5 files changed, 40 insertions(+), 17 deletions(-) diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java index 37d073bf6..8ae41fcfc 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java @@ -565,7 +565,8 @@ public void run(final Status status) { private static void setFailure(final BaseRequest request, final BaseResponse response, final Status status, final Errors error) { - response.setError(error); + response.setError(error == null ? Errors.STORAGE_ERROR : error); + response.setErrorMsg(status.toString()); LOG.error("Failed to handle: {}, status: {}, error: {}.", request, status, error); } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java index 6a4e36701..fa04c99fd 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java @@ -84,8 +84,7 @@ public Status rangeSplit(final long regionId, final long newRegionId, final Stri if (response.isSuccess()) { return Status.OK(); } - return new Status(-1, "fail to range split on region %d, error: %s", regionId, String.valueOf(response - .getError())); + return new Status(-1, "Fail to range split on region %d, error: %s", regionId, String.valueOf(response)); } catch (final Exception e) { LOG.error("Fail to range split on exception: {}.", StackTraceUtil.stackTrace(e)); return new Status(-1, "fail to range split on region %d", regionId); diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/BaseResponse.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/BaseResponse.java index a50d96f9d..a568d5b7f 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/BaseResponse.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/BaseResponse.java @@ -31,6 +31,7 @@ public class BaseResponse implements Serializable { private static final long serialVersionUID = 8411573936817037697L; private Errors error = Errors.NONE; + private String errorMsg; private long regionId; private RegionEpoch regionEpoch; private T value; @@ -47,6 +48,14 @@ public void setError(Errors error) { this.error = error; } + public String getErrorMsg() { + return errorMsg; + } + + public void setErrorMsg(String errorMsg) { + this.errorMsg = errorMsg; + } + public long getRegionId() { return regionId; } @@ -73,7 +82,7 @@ public void setValue(T value) { @Override public String toString() { - return "BaseResponse{" + "error=" + error + ", regionId=" + regionId + ", regionEpoch=" + regionEpoch - + ", value=" + value + '}'; + return "BaseResponse{" + "error=" + error + ", errorMsg='" + errorMsg + '\'' + ", regionId=" + regionId + + ", regionEpoch=" + regionEpoch + ", value=" + value + '}'; } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java index 81726c8ad..f46f8791b 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java @@ -152,7 +152,8 @@ static void setFailure(final KVStoreClosure closure, final String message) { * @param error critical error */ static void setCriticalError(final KVStoreClosure closure, final String message, final Throwable error) { - setFailure(closure, message); + setClosureError(closure); + // Will call closure#run in FSMCaller if (error != null) { throw new StorageException(message, error); } @@ -169,11 +170,19 @@ static void setCriticalError(final KVStoreClosure closure, final String message, * @param error critical error */ static void setCriticalError(final List closures, final String message, final Throwable error) { - for (final KVStoreClosure c : closures) { - setFailure(c, message); + for (final KVStoreClosure closure : closures) { + // Will call closure#run in FSMCaller + setClosureError(closure); } if (error != null) { throw new StorageException(message, error); } } + + static void setClosureError(final KVStoreClosure closure) { + if (closure != null) { + // closure is null on follower node + closure.setError(Errors.STORAGE_ERROR); + } + } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java index 10a2fe525..f03d6a44f 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java @@ -80,6 +80,7 @@ public KVStoreStateMachine(Region region, StoreEngine storeEngine) { @Override public void onApply(final Iterator it) { int index = 0; + int applied = 0; try { KVStateOutputList kvStates = KVStateOutputList.newInstance(); while (it.hasNext()) { @@ -98,7 +99,7 @@ public void onApply(final Iterator it) { } final KVState first = kvStates.getFirstElement(); if (first != null && !first.isSameOp(kvOp)) { - batchApplyAndRecycle(first.getOpByte(), kvStates); + applied += batchApplyAndRecycle(first.getOpByte(), kvStates); kvStates = KVStateOutputList.newInstance(); } kvStates.add(KVState.of(kvOp, done)); @@ -108,22 +109,25 @@ public void onApply(final Iterator it) { if (!kvStates.isEmpty()) { final KVState first = kvStates.getFirstElement(); assert first != null; - batchApplyAndRecycle(first.getOpByte(), kvStates); + applied += batchApplyAndRecycle(first.getOpByte(), kvStates); } } catch (final Throwable t) { - it.setErrorAndRollback(index, - new Status(RaftError.EIO, "StateMachine meet critical error: %s.", t.getMessage())); + it.setErrorAndRollback(index - applied, new Status(RaftError.EIO, "StateMachine meet critical error: %s.", + t.getMessage())); } finally { // metrics: qps - this.applyMeter.mark(index); + this.applyMeter.mark(applied); } } - private void batchApplyAndRecycle(final byte opByte, final KVStateOutputList kvStates) { + private int batchApplyAndRecycle(final byte opByte, final KVStateOutputList kvStates) { try { - if (kvStates.isEmpty()) { - return; + final int size = kvStates.size(); + + if (size == 0) { + return 0; } + if (!KVOperation.isValidOp(opByte)) { throw new IllegalKVOperationException("Unknown operation: " + opByte); } @@ -131,12 +135,13 @@ private void batchApplyAndRecycle(final byte opByte, final KVStateOutputList kvS // metrics: op qps final Meter opApplyMeter = KVMetrics.meter(STATE_MACHINE_APPLY_QPS, String.valueOf(this.region.getId()), KVOperation.opName(opByte)); - final int size = kvStates.size(); opApplyMeter.mark(size); this.batchWriteHistogram.update(size); // do batch apply batchApply(opByte, kvStates); + + return size; } finally { RecycleUtil.recycle(kvStates); } From 98f1b347841d04b90265de2ec8c9589e51d01e17 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Mon, 29 Apr 2019 17:49:46 +0800 Subject: [PATCH 04/16] (feat) add unit test --- .../alipay/sofa/jraft/core/IteratorImpl.java | 2 +- .../rhea/storage/KVStateMachineTest.java | 245 ++++++++++++++++++ 2 files changed, 246 insertions(+), 1 deletion(-) create mode 100644 jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java index 0891a90b8..a272b2fc0 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java @@ -147,7 +147,7 @@ public void setErrorAndRollback(final long ntail, final Status st) { this.currEntry = null; getOrCreateError().setType(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE); getOrCreateError().getStatus().setError(RaftError.ESTATEMACHINE, - "StateMachine meet critical error when applying one or more tasks since index=%d, %s", this.currentIndex, + "StateMachine meet critical error when applying one or more tasks since index=%d, %s", this.currentIndex, st != null ? st.toString() : "none"); } diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java new file mode 100644 index 000000000..7da01fba2 --- /dev/null +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rhea.storage; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.commons.io.FileUtils; +import org.junit.Before; +import org.junit.Test; + +import com.alipay.sofa.jraft.Node; +import com.alipay.sofa.jraft.RaftGroupService; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rhea.StoreEngine; +import com.alipay.sofa.jraft.rhea.client.pd.FakePlacementDriverClient; +import com.alipay.sofa.jraft.rhea.metadata.Region; +import com.alipay.sofa.jraft.rhea.util.ThrowUtil; +import com.alipay.sofa.jraft.util.BytesUtil; +import com.alipay.sofa.jraft.util.Endpoint; + +import static com.alipay.sofa.jraft.core.State.STATE_ERROR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author jiachun.fjc + */ +public class KVStateMachineTest { + + private static final int APPLY_COUNT = 100; + private static final int SUCCESS_COUNT = 10; + + private RaftGroupService raftGroupService; + private RaftRawKVStore raftRawKVStore; + private File raftDataPath; + + @Before + public void setup() throws IOException, InterruptedException { + final Region region = new Region(); + region.setId(1); + final StoreEngine storeEngine = new MockStoreEngine(); + final KVStoreStateMachine fsm = new KVStoreStateMachine(region, storeEngine); + final NodeOptions nodeOpts = new NodeOptions(); + final Configuration conf = new Configuration(); + conf.addPeer(PeerId.parsePeer("127.0.0.1:8081")); + nodeOpts.setInitialConf(conf); + nodeOpts.setFsm(fsm); + + final String raftDataPath = "raft_st_test"; + this.raftDataPath = new File(raftDataPath); + FileUtils.forceDelete(this.raftDataPath); + FileUtils.forceMkdir(this.raftDataPath); + + final Path logUri = Paths.get(raftDataPath, "log"); + nodeOpts.setLogUri(logUri.toString()); + + final Path meteUri = Paths.get(raftDataPath, "meta"); + nodeOpts.setRaftMetaUri(meteUri.toString()); + + final Path snapshotUri = Paths.get(raftDataPath, "snapshot"); + nodeOpts.setSnapshotUri(snapshotUri.toString()); + + final Endpoint serverAddress = new Endpoint("127.0.0.1", 8081); + final PeerId serverId = new PeerId(serverAddress, 0); + this.raftGroupService = new RaftGroupService("st_test", serverId, nodeOpts, null, true); + + final Node node = this.raftGroupService.start(false); + + for (int i = 0; i < 100; i++) { + if (node.isLeader()) { + break; + } + Thread.sleep(100); + } + + final RawKVStore rawKVStore = storeEngine.getRawKVStore(); + this.raftRawKVStore = new RaftRawKVStore(node, rawKVStore, null); + } + + @Test + public void tearDown() throws IOException { + if (this.raftGroupService != null) { + this.raftGroupService.shutdown(); + try { + this.raftGroupService.join(); + } catch (final InterruptedException e) { + ThrowUtil.throwException(e); + } + } + FileUtils.forceDelete(this.raftDataPath); + } + + @Test + public void failApplyTest() throws Exception { + final CountDownLatch latch = new CountDownLatch(APPLY_COUNT); + final List closures = new ArrayList<>(); + final BlockingQueue successQueue = new ArrayBlockingQueue<>(APPLY_COUNT); + final BlockingQueue failQueue = new ArrayBlockingQueue<>(APPLY_COUNT); + assertTrue(this.raftGroupService.getRaftNode().isLeader()); + for (int i = 0; i < SUCCESS_COUNT; i++) { + final KVStoreClosure c = new BaseKVStoreClosure() { + + @Override + public void run(Status status) { + latch.countDown(); + successQueue.add(status); + } + }; + closures.add(c); + } + for (int i = SUCCESS_COUNT; i < APPLY_COUNT; i++) { + final KVStoreClosure c = new BaseKVStoreClosure() { + + @Override + public void run(Status status) { + latch.countDown(); + failQueue.add(status); + } + }; + closures.add(c); + } + + for (int i = 0; i < SUCCESS_COUNT; i++) { + final byte[] bytes = BytesUtil.writeUtf8(String.valueOf(i)); + this.raftRawKVStore.put(bytes, bytes, closures.get(i)); + } + + for (int i = SUCCESS_COUNT; i < APPLY_COUNT; i++) { + final byte[] bytes = BytesUtil.writeUtf8(String.valueOf(i)); + this.raftRawKVStore.merge(bytes, bytes, closures.get(i)); + } + + latch.await(); + + final Node node = this.raftGroupService.getRaftNode(); + assertFalse(node.isLeader()); + final Field field = node.getClass().getDeclaredField("state"); + field.setAccessible(true); + assertEquals(field.get(node), STATE_ERROR); + assertEquals(SUCCESS_COUNT, successQueue.size()); + assertEquals(APPLY_COUNT - SUCCESS_COUNT, failQueue.size()); + while (true) { + final Status st = successQueue.poll(); + if (st == null) { + break; + } + assertTrue(st.isOk()); + } + + while (true) { + final Status st = failQueue.poll(); + if (st == null) { + break; + } + assertFalse(st.isOk()); + assertEquals(RaftError.ESTATEMACHINE, st.getRaftError()); + } + } + + static class MockKVStore extends MemoryRawKVStore { + + private int putIndex = 0; + + @Override + public void put(byte[] key, byte[] value, KVStoreClosure closure) { + if (this.putIndex++ < SUCCESS_COUNT) { + if (closure != null) { + closure.setData(value); + closure.run(Status.OK()); + } + } else { + throw new RuntimeException("fail test"); + } + } + + @Override + public void merge(byte[] key, byte[] value, KVStoreClosure closure) { + if (this.putIndex++ < SUCCESS_COUNT) { + if (closure != null) { + closure.setData(value); + closure.run(Status.OK()); + } + } else { + throw new RuntimeException("fail test"); + } + } + } + + static class MockStoreEngine extends StoreEngine { + + private final MockKVStore mockKVStore = new MockKVStore(); + private final ExecutorService leaderStateTrigger = Executors.newSingleThreadExecutor(); + + public MockStoreEngine() { + super(new MockPlacementDriverClient()); + } + + @Override + public BatchRawKVStore getRawKVStore() { + return this.mockKVStore; + } + + @Override + public ExecutorService getLeaderStateTrigger() { + return this.leaderStateTrigger; + } + } + + static class MockPlacementDriverClient extends FakePlacementDriverClient { + + public MockPlacementDriverClient() { + super(1, "st_test"); + } + } +} From f47327d7729329ef18dc7014eaa75aafd6f2653a Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Tue, 30 Apr 2019 12:35:15 +0800 Subject: [PATCH 05/16] (fix) fix unit test --- .../java/com/alipay/sofa/jraft/core/IteratorImplTest.java | 4 ++-- .../test/java/com/alipay/sofa/jraft/core/IteratorTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java index a61a2d9a6..2de7e32e4 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java @@ -113,7 +113,7 @@ public void testRunTheRestClosureWithError() throws Exception { final Status s = mc.s; Assert.assertEquals(RaftError.ESTATEMACHINE.getNumber(), s.getCode()); assertEquals( - "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", + "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", s.getErrorMsg()); } } @@ -129,7 +129,7 @@ public void testSetErrorAndRollback() { Assert.assertEquals(RaftError.ESTATEMACHINE.getNumber(), iter.getError().getStatus().getCode()); Assert .assertEquals( - "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", + "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", iter.getError().getStatus().getErrorMsg()); assertEquals(6, iter.getIndex()); } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorTest.java index d4061cacb..a7221931b 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorTest.java @@ -107,7 +107,7 @@ public void testSetErrorAndRollback() { Assert.assertEquals(RaftError.ESTATEMACHINE.getNumber(), iterImpl.getError().getStatus().getCode()); Assert .assertEquals( - "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", + "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", iterImpl.getError().getStatus().getErrorMsg()); assertEquals(6, iter.getIndex()); } From f77e3be266710140a11a32ae01fb9c41ff90daf3 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Tue, 30 Apr 2019 13:18:43 +0800 Subject: [PATCH 06/16] (fix) fix unit test --- .../sofa/jraft/rhea/storage/KVStateMachineTest.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java index 7da01fba2..6e54036e6 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.Executors; import org.apache.commons.io.FileUtils; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -78,7 +79,9 @@ public void setup() throws IOException, InterruptedException { final String raftDataPath = "raft_st_test"; this.raftDataPath = new File(raftDataPath); - FileUtils.forceDelete(this.raftDataPath); + if (this.raftDataPath.exists()) { + FileUtils.forceDelete(this.raftDataPath); + } FileUtils.forceMkdir(this.raftDataPath); final Path logUri = Paths.get(raftDataPath, "log"); @@ -107,7 +110,7 @@ public void setup() throws IOException, InterruptedException { this.raftRawKVStore = new RaftRawKVStore(node, rawKVStore, null); } - @Test + @After public void tearDown() throws IOException { if (this.raftGroupService != null) { this.raftGroupService.shutdown(); @@ -117,7 +120,9 @@ public void tearDown() throws IOException { ThrowUtil.throwException(e); } } - FileUtils.forceDelete(this.raftDataPath); + if (this.raftDataPath.exists()) { + FileUtils.forceDelete(this.raftDataPath); + } } @Test From 4d643981913c239366e4e9cd06b857d78f3fb993 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Sun, 5 May 2019 13:04:16 +0800 Subject: [PATCH 07/16] (fix) fix unit test --- .../com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java index 6e54036e6..805a973cc 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java @@ -188,7 +188,7 @@ public void run(Status status) { break; } assertFalse(st.isOk()); - assertEquals(RaftError.ESTATEMACHINE, st.getRaftError()); + assertTrue(st.getRaftError() == RaftError.ESTATEMACHINE || st.getRaftError() == RaftError.EPERM); } } From e007a5bb2dea54d40e9851407c3ffed0bfc17951 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Mon, 29 Apr 2019 02:33:14 +0800 Subject: [PATCH 08/16] (bugfix) fix rheakv state machine bug for #136 --- .../jraft/rhea/storage/BaseRawKVStore.java | 53 ++++++++++++++- .../rhea/storage/KVStoreStateMachine.java | 66 ++++++++++--------- .../jraft/rhea/storage/MemoryRawKVStore.java | 22 +++---- .../jraft/rhea/storage/RocksRawKVStore.java | 58 +++++++--------- 4 files changed, 122 insertions(+), 77 deletions(-) diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java index 1ccd5b8da..d848b7b7d 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java @@ -25,6 +25,7 @@ import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.rhea.errors.Errors; +import com.alipay.sofa.jraft.rhea.errors.StorageException; import com.alipay.sofa.jraft.rhea.metrics.KVMetrics; import com.alipay.sofa.jraft.rhea.util.StackTraceUtil; import com.codahale.metrics.Timer; @@ -76,7 +77,7 @@ public void execute(final NodeExecutor nodeExecutor, final boolean isLeader, fin if (nodeExecutor != null) { nodeExecutor.execute(new Status(RaftError.EIO, "Fail to [EXECUTE]"), isLeader); } - setFailure(closure, "Fail to [EXECUTE]"); + setCriticalError(closure, "Fail to [EXECUTE]", e); } finally { timeCtx.stop(); } @@ -110,6 +111,13 @@ static Timer.Context getTimeContext(final String opName) { return KVMetrics.timer(DB_TIMER, opName).time(); } + /** + * Sets success, if current node is a leader, reply to + * client success with result data response. + * + * @param closure callback + * @param data result data to reply to client + */ static void setSuccess(final KVStoreClosure closure, final Object data) { if (closure != null) { // closure is null on follower node @@ -118,6 +126,13 @@ static void setSuccess(final KVStoreClosure closure, final Object data) { } } + /** + * Sets failure, if current node is a leader, reply to + * client failure response. + * + * @param closure callback + * @param message error message + */ static void setFailure(final KVStoreClosure closure, final String message) { if (closure != null) { // closure is null on follower node @@ -125,4 +140,40 @@ static void setFailure(final KVStoreClosure closure, final String message) { closure.run(new Status(RaftError.EIO, message)); } } + + /** + * Sets critical error and halt the state machine. + * + * if current node is a leader, first reply to client + * failure response. + * + * @param closure callback + * @param message error message + * @param error critical error + */ + static void setCriticalError(final KVStoreClosure closure, final String message, final Throwable error) { + setFailure(closure, message); + if (error != null) { + throw new StorageException(message, error); + } + } + + /** + * Sets critical error and halt the state machine. + * + * if current node is a leader, first reply to client + * failure response. + * + * @param closures callback list + * @param message error message + * @param error critical error + */ + static void setCriticalError(final List closures, final String message, final Throwable error) { + for (final KVStoreClosure c : closures) { + setFailure(c, message); + } + if (error != null) { + throw new StorageException(message, error); + } + } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java index 39d9a4519..e898a0f16 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java @@ -80,42 +80,48 @@ public KVStoreStateMachine(Region region, StoreEngine storeEngine) { @Override public void onApply(final Iterator it) { - int stCount = 0; - KVStateOutputList kvStates = KVStateOutputList.newInstance(); - while (it.hasNext()) { - KVOperation kvOp; - final KVClosureAdapter done = (KVClosureAdapter) it.done(); - if (done != null) { - kvOp = done.getOperation(); - } else { - final ByteBuffer buf = it.getData(); - try { - if (buf.hasArray()) { - kvOp = this.serializer.readObject(buf.array(), KVOperation.class); - } else { - kvOp = this.serializer.readObject(buf, KVOperation.class); + int index = 0; + try { + KVStateOutputList kvStates = KVStateOutputList.newInstance(); + while (it.hasNext()) { + KVOperation kvOp; + final KVClosureAdapter done = (KVClosureAdapter) it.done(); + if (done != null) { + kvOp = done.getOperation(); + } else { + final ByteBuffer buf = it.getData(); + try { + if (buf.hasArray()) { + kvOp = this.serializer.readObject(buf.array(), KVOperation.class); + } else { + kvOp = this.serializer.readObject(buf, KVOperation.class); + } + } catch (final Throwable t) { + ++index; + throw new StoreCodecException("Decode operation error", t); } - } catch (final Throwable t) { - throw new StoreCodecException("Decode operation error", t); } + final KVState first = kvStates.getFirstElement(); + if (first != null && !first.isSameOp(kvOp)) { + batchApplyAndRecycle(first.getOpByte(), kvStates); + kvStates = KVStateOutputList.newInstance(); + } + kvStates.add(KVState.of(kvOp, done)); + ++index; + it.next(); } - final KVState first = kvStates.getFirstElement(); - if (first != null && !first.isSameOp(kvOp)) { + if (!kvStates.isEmpty()) { + final KVState first = kvStates.getFirstElement(); + assert first != null; batchApplyAndRecycle(first.getOpByte(), kvStates); - kvStates = KVStateOutputList.newInstance(); } - kvStates.add(KVState.of(kvOp, done)); - ++stCount; - it.next(); - } - if (!kvStates.isEmpty()) { - final KVState first = kvStates.getFirstElement(); - assert first != null; - batchApplyAndRecycle(first.getOpByte(), kvStates); + } catch (final Throwable t) { + it.setErrorAndRollback(index, + new Status(RaftError.EIO, "StateMachine meet critical error: %s.", t.getMessage())); + } finally { + // metrics: qps + this.applyMeter.mark(index); } - - // metrics: qps - this.applyMeter.mark(stCount); } private void batchApplyAndRecycle(final byte opByte, final KVStateOutputList kvStates) { diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java index 300d036e8..7645d34b6 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java @@ -180,7 +180,7 @@ public void getSequence(final byte[] seqKey, final int step, final KVStoreClosur } catch (final Exception e) { LOG.error("Fail to [GET_SEQUENCE], [key = {}, step = {}], {}.", Arrays.toString(seqKey), step, StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [GET_SEQUENCE]"); + setCriticalError(closure, "Fail to [GET_SEQUENCE]", e); } finally { timeCtx.stop(); } @@ -195,7 +195,7 @@ public void resetSequence(final byte[] seqKey, final KVStoreClosure closure) { } catch (final Exception e) { LOG.error("Fail to [RESET_SEQUENCE], [key = {}], {}.", Arrays.toString(seqKey), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [RESET_SEQUENCE]"); + setCriticalError(closure, "Fail to [RESET_SEQUENCE]", e); } finally { timeCtx.stop(); } @@ -210,7 +210,7 @@ public void put(final byte[] key, final byte[] value, final KVStoreClosure closu } catch (final Exception e) { LOG.error("Fail to [PUT], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT]"); + setCriticalError(closure, "Fail to [PUT]", e); } finally { timeCtx.stop(); } @@ -225,7 +225,7 @@ public void getAndPut(final byte[] key, final byte[] value, final KVStoreClosure } catch (final Exception e) { LOG.error("Fail to [GET_PUT], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [GET_PUT]"); + setCriticalError(closure, "Fail to [GET_PUT]", e); } finally { timeCtx.stop(); } @@ -250,7 +250,7 @@ public void merge(final byte[] key, final byte[] value, final KVStoreClosure clo } catch (final Exception e) { LOG.error("Fail to [MERGE], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [MERGE]"); + setCriticalError(closure, "Fail to [MERGE]", e); } finally { timeCtx.stop(); } @@ -266,7 +266,7 @@ public void put(final List entries, final KVStoreClosure closure) { setSuccess(closure, Boolean.TRUE); } catch (final Exception e) { LOG.error("Failed to [PUT_LIST], [size = {}], {}.", entries.size(), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT_LIST]"); + setCriticalError(closure, "Fail to [PUT_LIST]", e); } finally { timeCtx.stop(); } @@ -281,7 +281,7 @@ public void putIfAbsent(final byte[] key, final byte[] value, final KVStoreClosu } catch (final Exception e) { LOG.error("Fail to [PUT_IF_ABSENT], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT_IF_ABSENT]"); + setCriticalError(closure, "Fail to [PUT_IF_ABSENT]", e); } finally { timeCtx.stop(); } @@ -438,7 +438,7 @@ public void tryLockWith(final byte[] key, final byte[] fencingKey, final boolean setSuccess(closure, owner); } catch (final Exception e) { LOG.error("Fail to [TRY_LOCK], [{}, {}], {}.", Arrays.toString(key), acquirer, StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [TRY_LOCK]"); + setCriticalError(closure, "Fail to [TRY_LOCK]", e); } finally { timeCtx.stop(); } @@ -512,7 +512,7 @@ public void releaseLockWith(final byte[] key, final DistributedLock.Acquirer acq setSuccess(closure, owner); } catch (final Exception e) { LOG.error("Fail to [RELEASE_LOCK], [{}], {}.", Arrays.toString(key), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [RELEASE_LOCK]"); + setCriticalError(closure, "Fail to [RELEASE_LOCK]", e); } finally { timeCtx.stop(); } @@ -544,7 +544,7 @@ public void delete(final byte[] key, final KVStoreClosure closure) { setSuccess(closure, Boolean.TRUE); } catch (final Exception e) { LOG.error("Fail to [DELETE], [{}], {}.", Arrays.toString(key), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [DELETE]"); + setCriticalError(closure, "Fail to [DELETE]", e); } finally { timeCtx.stop(); } @@ -562,7 +562,7 @@ public void deleteRange(final byte[] startKey, final byte[] endKey, final KVStor } catch (final Exception e) { LOG.error("Fail to [DELETE_RANGE], ['[{}, {})'], {}.", Arrays.toString(startKey), Arrays.toString(endKey), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [DELETE_RANGE]"); + setCriticalError(closure, "Fail to [DELETE_RANGE]", e); } finally { timeCtx.stop(); } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java index e67093f0d..30f7a6c2b 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java @@ -346,7 +346,7 @@ public void getSequence(final byte[] seqKey, final int step, final KVStoreClosur } catch (final Exception e) { LOG.error("Fail to [GET_SEQUENCE], [key = {}, step = {}], {}.", Arrays.toString(seqKey), step, StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [GET_SEQUENCE]"); + setCriticalError(closure, "Fail to [GET_SEQUENCE]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -364,7 +364,7 @@ public void resetSequence(final byte[] seqKey, final KVStoreClosure closure) { } catch (final Exception e) { LOG.error("Fail to [RESET_SEQUENCE], [key = {}], {}.", Arrays.toString(seqKey), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [RESET_SEQUENCE]"); + setCriticalError(closure, "Fail to [RESET_SEQUENCE]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -392,11 +392,9 @@ public void batchResetSequence(final KVStateOutputList kvStates) { setSuccess(kvState.getDone(), Boolean.TRUE); } } catch (final Exception e) { - LOG.error("Failed to [BATCH_RESET_SEQUENCE], [size = {}], {}.", - segment.size(), StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_RESET_SEQUENCE]"); - } + LOG.error("Failed to [BATCH_RESET_SEQUENCE], [size = {}], {}.", segment.size(), + StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_RESET_SEQUENCE]", e); } return null; }); @@ -417,7 +415,7 @@ public void put(final byte[] key, final byte[] value, final KVStoreClosure closu } catch (final Exception e) { LOG.error("Fail to [PUT], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT]"); + setCriticalError(closure, "Fail to [PUT]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -447,11 +445,8 @@ public void batchPut(final KVStateOutputList kvStates) { setSuccess(kvState.getDone(), Boolean.TRUE); } } catch (final Exception e) { - LOG.error("Failed to [BATCH_PUT], [size = {}] {}.", segment.size(), - StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_PUT]"); - } + LOG.error("Failed to [BATCH_PUT], [size = {}] {}.", segment.size(), StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_PUT]", e); } return null; }); @@ -473,7 +468,7 @@ public void getAndPut(final byte[] key, final byte[] value, final KVStoreClosure } catch (final Exception e) { LOG.error("Fail to [GET_PUT], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [GET_PUT]"); + setCriticalError(closure, "Fail to [GET_PUT]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -509,10 +504,8 @@ public void batchGetAndPut(final KVStateOutputList kvStates) { } } catch (final Exception e) { LOG.error("Failed to [BATCH_GET_PUT], [size = {}] {}.", segment.size(), - StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_GET_PUT]"); - } + StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_GET_PUT]", e); } return null; }); @@ -533,7 +526,7 @@ public void merge(final byte[] key, final byte[] value, final KVStoreClosure clo } catch (final Exception e) { LOG.error("Fail to [MERGE], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [MERGE]"); + setCriticalError(closure, "Fail to [MERGE]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -563,11 +556,8 @@ public void batchMerge(final KVStateOutputList kvStates) { setSuccess(kvState.getDone(), Boolean.TRUE); } } catch (final Exception e) { - LOG.error("Failed to [BATCH_MERGE], [size = {}] {}.", segment.size(), - StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_MERGE]"); - } + LOG.error("Failed to [BATCH_MERGE], [size = {}] {}.", segment.size(), StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_MERGE]", e); } return null; }); @@ -590,7 +580,7 @@ public void put(final List entries, final KVStoreClosure closure) { setSuccess(closure, Boolean.TRUE); } catch (final Exception e) { LOG.error("Failed to [PUT_LIST], [size = {}], {}.", entries.size(), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT_LIST]"); + setCriticalError(closure, "Fail to [PUT_LIST]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -611,7 +601,7 @@ public void putIfAbsent(final byte[] key, final byte[] value, final KVStoreClosu } catch (final Exception e) { LOG.error("Fail to [PUT_IF_ABSENT], [{}, {}], {}.", Arrays.toString(key), Arrays.toString(value), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [PUT_IF_ABSENT]"); + setCriticalError(closure, "Fail to [PUT_IF_ABSENT]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -772,7 +762,7 @@ public void tryLockWith(final byte[] key, final byte[] fencingKey, final boolean setSuccess(closure, owner); } catch (final Exception e) { LOG.error("Fail to [TRY_LOCK], [{}, {}], {}.", Arrays.toString(key), acquirer, StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [TRY_LOCK]"); + setCriticalError(closure, "Fail to [TRY_LOCK]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -851,7 +841,7 @@ public void releaseLockWith(final byte[] key, final DistributedLock.Acquirer acq setSuccess(closure, owner); } catch (final Exception e) { LOG.error("Fail to [RELEASE_LOCK], [{}], {}.", Arrays.toString(key), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [RELEASE_LOCK]"); + setCriticalError(closure, "Fail to [RELEASE_LOCK]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -895,7 +885,7 @@ public void delete(final byte[] key, final KVStoreClosure closure) { setSuccess(closure, Boolean.TRUE); } catch (final Exception e) { LOG.error("Fail to [DELETE], [{}], {}.", Arrays.toString(key), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [DELETE]"); + setCriticalError(closure, "Fail to [DELETE]", e); } finally { readLock.unlock(); timeCtx.stop(); @@ -923,11 +913,9 @@ public void batchDelete(final KVStateOutputList kvStates) { setSuccess(kvState.getDone(), Boolean.TRUE); } } catch (final Exception e) { - LOG.error("Failed to [BATCH_DELETE], [size = {}], {}.", - segment.size(), StackTraceUtil.stackTrace(e)); - for (final KVState kvState : segment) { - setFailure(kvState.getDone(), "Fail to [BATCH_DELETE]"); - } + LOG.error("Failed to [BATCH_DELETE], [size = {}], {}.", segment.size(), + StackTraceUtil.stackTrace(e)); + setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_DELETE]", e); } return null; }); @@ -948,7 +936,7 @@ public void deleteRange(final byte[] startKey, final byte[] endKey, final KVStor } catch (final Exception e) { LOG.error("Fail to [DELETE_RANGE], ['[{}, {})'], {}.", Arrays.toString(startKey), Arrays.toString(endKey), StackTraceUtil.stackTrace(e)); - setFailure(closure, "Fail to [DELETE_RANGE]"); + setCriticalError(closure, "Fail to [DELETE_RANGE]", e); } finally { readLock.unlock(); timeCtx.stop(); From 3a09c50bc0eff7f7e4dbeeb7842db7d4dd84c341 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Mon, 29 Apr 2019 03:01:15 +0800 Subject: [PATCH 09/16] (fix) typo --- .../com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java index d848b7b7d..81726c8ad 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java @@ -144,7 +144,7 @@ static void setFailure(final KVStoreClosure closure, final String message) { /** * Sets critical error and halt the state machine. * - * if current node is a leader, first reply to client + * If current node is a leader, first reply to client * failure response. * * @param closure callback @@ -161,7 +161,7 @@ static void setCriticalError(final KVStoreClosure closure, final String message, /** * Sets critical error and halt the state machine. * - * if current node is a leader, first reply to client + * If current node is a leader, first reply to client * failure response. * * @param closures callback list From 25f431a5b185305e3bd146c0bb32819182088fa6 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Mon, 29 Apr 2019 04:18:42 +0800 Subject: [PATCH 10/16] (bugfix) fix rheakv state machine bug for #136 --- .../jraft/rhea/DefaultRegionKVService.java | 3 ++- .../rhea/client/DefaultRheaKVCliService.java | 3 +-- .../jraft/rhea/cmd/store/BaseResponse.java | 13 +++++++++-- .../jraft/rhea/storage/BaseRawKVStore.java | 15 +++++++++--- .../rhea/storage/KVStoreStateMachine.java | 23 +++++++++++-------- 5 files changed, 40 insertions(+), 17 deletions(-) diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java index 37d073bf6..8ae41fcfc 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java @@ -565,7 +565,8 @@ public void run(final Status status) { private static void setFailure(final BaseRequest request, final BaseResponse response, final Status status, final Errors error) { - response.setError(error); + response.setError(error == null ? Errors.STORAGE_ERROR : error); + response.setErrorMsg(status.toString()); LOG.error("Failed to handle: {}, status: {}, error: {}.", request, status, error); } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java index 6a4e36701..fa04c99fd 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java @@ -84,8 +84,7 @@ public Status rangeSplit(final long regionId, final long newRegionId, final Stri if (response.isSuccess()) { return Status.OK(); } - return new Status(-1, "fail to range split on region %d, error: %s", regionId, String.valueOf(response - .getError())); + return new Status(-1, "Fail to range split on region %d, error: %s", regionId, String.valueOf(response)); } catch (final Exception e) { LOG.error("Fail to range split on exception: {}.", StackTraceUtil.stackTrace(e)); return new Status(-1, "fail to range split on region %d", regionId); diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/BaseResponse.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/BaseResponse.java index a50d96f9d..a568d5b7f 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/BaseResponse.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/BaseResponse.java @@ -31,6 +31,7 @@ public class BaseResponse implements Serializable { private static final long serialVersionUID = 8411573936817037697L; private Errors error = Errors.NONE; + private String errorMsg; private long regionId; private RegionEpoch regionEpoch; private T value; @@ -47,6 +48,14 @@ public void setError(Errors error) { this.error = error; } + public String getErrorMsg() { + return errorMsg; + } + + public void setErrorMsg(String errorMsg) { + this.errorMsg = errorMsg; + } + public long getRegionId() { return regionId; } @@ -73,7 +82,7 @@ public void setValue(T value) { @Override public String toString() { - return "BaseResponse{" + "error=" + error + ", regionId=" + regionId + ", regionEpoch=" + regionEpoch - + ", value=" + value + '}'; + return "BaseResponse{" + "error=" + error + ", errorMsg='" + errorMsg + '\'' + ", regionId=" + regionId + + ", regionEpoch=" + regionEpoch + ", value=" + value + '}'; } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java index 81726c8ad..f46f8791b 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java @@ -152,7 +152,8 @@ static void setFailure(final KVStoreClosure closure, final String message) { * @param error critical error */ static void setCriticalError(final KVStoreClosure closure, final String message, final Throwable error) { - setFailure(closure, message); + setClosureError(closure); + // Will call closure#run in FSMCaller if (error != null) { throw new StorageException(message, error); } @@ -169,11 +170,19 @@ static void setCriticalError(final KVStoreClosure closure, final String message, * @param error critical error */ static void setCriticalError(final List closures, final String message, final Throwable error) { - for (final KVStoreClosure c : closures) { - setFailure(c, message); + for (final KVStoreClosure closure : closures) { + // Will call closure#run in FSMCaller + setClosureError(closure); } if (error != null) { throw new StorageException(message, error); } } + + static void setClosureError(final KVStoreClosure closure) { + if (closure != null) { + // closure is null on follower node + closure.setError(Errors.STORAGE_ERROR); + } + } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java index e898a0f16..6e7da1d2b 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java @@ -81,6 +81,7 @@ public KVStoreStateMachine(Region region, StoreEngine storeEngine) { @Override public void onApply(final Iterator it) { int index = 0; + int applied = 0; try { KVStateOutputList kvStates = KVStateOutputList.newInstance(); while (it.hasNext()) { @@ -103,7 +104,7 @@ public void onApply(final Iterator it) { } final KVState first = kvStates.getFirstElement(); if (first != null && !first.isSameOp(kvOp)) { - batchApplyAndRecycle(first.getOpByte(), kvStates); + applied += batchApplyAndRecycle(first.getOpByte(), kvStates); kvStates = KVStateOutputList.newInstance(); } kvStates.add(KVState.of(kvOp, done)); @@ -113,22 +114,25 @@ public void onApply(final Iterator it) { if (!kvStates.isEmpty()) { final KVState first = kvStates.getFirstElement(); assert first != null; - batchApplyAndRecycle(first.getOpByte(), kvStates); + applied += batchApplyAndRecycle(first.getOpByte(), kvStates); } } catch (final Throwable t) { - it.setErrorAndRollback(index, - new Status(RaftError.EIO, "StateMachine meet critical error: %s.", t.getMessage())); + it.setErrorAndRollback(index - applied, new Status(RaftError.EIO, "StateMachine meet critical error: %s.", + t.getMessage())); } finally { // metrics: qps - this.applyMeter.mark(index); + this.applyMeter.mark(applied); } } - private void batchApplyAndRecycle(final byte opByte, final KVStateOutputList kvStates) { + private int batchApplyAndRecycle(final byte opByte, final KVStateOutputList kvStates) { try { - if (kvStates.isEmpty()) { - return; + final int size = kvStates.size(); + + if (size == 0) { + return 0; } + if (!KVOperation.isValidOp(opByte)) { throw new IllegalKVOperationException("Unknown operation: " + opByte); } @@ -136,12 +140,13 @@ private void batchApplyAndRecycle(final byte opByte, final KVStateOutputList kvS // metrics: op qps final Meter opApplyMeter = KVMetrics.meter(STATE_MACHINE_APPLY_QPS, String.valueOf(this.region.getId()), KVOperation.opName(opByte)); - final int size = kvStates.size(); opApplyMeter.mark(size); this.batchWriteHistogram.update(size); // do batch apply batchApply(opByte, kvStates); + + return size; } finally { RecycleUtil.recycle(kvStates); } From 1142eb00c1194115b3aab2149087147320a603c8 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Mon, 29 Apr 2019 17:49:46 +0800 Subject: [PATCH 11/16] (feat) add unit test --- .../alipay/sofa/jraft/core/IteratorImpl.java | 2 +- .../rhea/storage/KVStateMachineTest.java | 245 ++++++++++++++++++ 2 files changed, 246 insertions(+), 1 deletion(-) create mode 100644 jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java index 0891a90b8..a272b2fc0 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/IteratorImpl.java @@ -147,7 +147,7 @@ public void setErrorAndRollback(final long ntail, final Status st) { this.currEntry = null; getOrCreateError().setType(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE); getOrCreateError().getStatus().setError(RaftError.ESTATEMACHINE, - "StateMachine meet critical error when applying one or more tasks since index=%d, %s", this.currentIndex, + "StateMachine meet critical error when applying one or more tasks since index=%d, %s", this.currentIndex, st != null ? st.toString() : "none"); } diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java new file mode 100644 index 000000000..7da01fba2 --- /dev/null +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.rhea.storage; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.commons.io.FileUtils; +import org.junit.Before; +import org.junit.Test; + +import com.alipay.sofa.jraft.Node; +import com.alipay.sofa.jraft.RaftGroupService; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rhea.StoreEngine; +import com.alipay.sofa.jraft.rhea.client.pd.FakePlacementDriverClient; +import com.alipay.sofa.jraft.rhea.metadata.Region; +import com.alipay.sofa.jraft.rhea.util.ThrowUtil; +import com.alipay.sofa.jraft.util.BytesUtil; +import com.alipay.sofa.jraft.util.Endpoint; + +import static com.alipay.sofa.jraft.core.State.STATE_ERROR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author jiachun.fjc + */ +public class KVStateMachineTest { + + private static final int APPLY_COUNT = 100; + private static final int SUCCESS_COUNT = 10; + + private RaftGroupService raftGroupService; + private RaftRawKVStore raftRawKVStore; + private File raftDataPath; + + @Before + public void setup() throws IOException, InterruptedException { + final Region region = new Region(); + region.setId(1); + final StoreEngine storeEngine = new MockStoreEngine(); + final KVStoreStateMachine fsm = new KVStoreStateMachine(region, storeEngine); + final NodeOptions nodeOpts = new NodeOptions(); + final Configuration conf = new Configuration(); + conf.addPeer(PeerId.parsePeer("127.0.0.1:8081")); + nodeOpts.setInitialConf(conf); + nodeOpts.setFsm(fsm); + + final String raftDataPath = "raft_st_test"; + this.raftDataPath = new File(raftDataPath); + FileUtils.forceDelete(this.raftDataPath); + FileUtils.forceMkdir(this.raftDataPath); + + final Path logUri = Paths.get(raftDataPath, "log"); + nodeOpts.setLogUri(logUri.toString()); + + final Path meteUri = Paths.get(raftDataPath, "meta"); + nodeOpts.setRaftMetaUri(meteUri.toString()); + + final Path snapshotUri = Paths.get(raftDataPath, "snapshot"); + nodeOpts.setSnapshotUri(snapshotUri.toString()); + + final Endpoint serverAddress = new Endpoint("127.0.0.1", 8081); + final PeerId serverId = new PeerId(serverAddress, 0); + this.raftGroupService = new RaftGroupService("st_test", serverId, nodeOpts, null, true); + + final Node node = this.raftGroupService.start(false); + + for (int i = 0; i < 100; i++) { + if (node.isLeader()) { + break; + } + Thread.sleep(100); + } + + final RawKVStore rawKVStore = storeEngine.getRawKVStore(); + this.raftRawKVStore = new RaftRawKVStore(node, rawKVStore, null); + } + + @Test + public void tearDown() throws IOException { + if (this.raftGroupService != null) { + this.raftGroupService.shutdown(); + try { + this.raftGroupService.join(); + } catch (final InterruptedException e) { + ThrowUtil.throwException(e); + } + } + FileUtils.forceDelete(this.raftDataPath); + } + + @Test + public void failApplyTest() throws Exception { + final CountDownLatch latch = new CountDownLatch(APPLY_COUNT); + final List closures = new ArrayList<>(); + final BlockingQueue successQueue = new ArrayBlockingQueue<>(APPLY_COUNT); + final BlockingQueue failQueue = new ArrayBlockingQueue<>(APPLY_COUNT); + assertTrue(this.raftGroupService.getRaftNode().isLeader()); + for (int i = 0; i < SUCCESS_COUNT; i++) { + final KVStoreClosure c = new BaseKVStoreClosure() { + + @Override + public void run(Status status) { + latch.countDown(); + successQueue.add(status); + } + }; + closures.add(c); + } + for (int i = SUCCESS_COUNT; i < APPLY_COUNT; i++) { + final KVStoreClosure c = new BaseKVStoreClosure() { + + @Override + public void run(Status status) { + latch.countDown(); + failQueue.add(status); + } + }; + closures.add(c); + } + + for (int i = 0; i < SUCCESS_COUNT; i++) { + final byte[] bytes = BytesUtil.writeUtf8(String.valueOf(i)); + this.raftRawKVStore.put(bytes, bytes, closures.get(i)); + } + + for (int i = SUCCESS_COUNT; i < APPLY_COUNT; i++) { + final byte[] bytes = BytesUtil.writeUtf8(String.valueOf(i)); + this.raftRawKVStore.merge(bytes, bytes, closures.get(i)); + } + + latch.await(); + + final Node node = this.raftGroupService.getRaftNode(); + assertFalse(node.isLeader()); + final Field field = node.getClass().getDeclaredField("state"); + field.setAccessible(true); + assertEquals(field.get(node), STATE_ERROR); + assertEquals(SUCCESS_COUNT, successQueue.size()); + assertEquals(APPLY_COUNT - SUCCESS_COUNT, failQueue.size()); + while (true) { + final Status st = successQueue.poll(); + if (st == null) { + break; + } + assertTrue(st.isOk()); + } + + while (true) { + final Status st = failQueue.poll(); + if (st == null) { + break; + } + assertFalse(st.isOk()); + assertEquals(RaftError.ESTATEMACHINE, st.getRaftError()); + } + } + + static class MockKVStore extends MemoryRawKVStore { + + private int putIndex = 0; + + @Override + public void put(byte[] key, byte[] value, KVStoreClosure closure) { + if (this.putIndex++ < SUCCESS_COUNT) { + if (closure != null) { + closure.setData(value); + closure.run(Status.OK()); + } + } else { + throw new RuntimeException("fail test"); + } + } + + @Override + public void merge(byte[] key, byte[] value, KVStoreClosure closure) { + if (this.putIndex++ < SUCCESS_COUNT) { + if (closure != null) { + closure.setData(value); + closure.run(Status.OK()); + } + } else { + throw new RuntimeException("fail test"); + } + } + } + + static class MockStoreEngine extends StoreEngine { + + private final MockKVStore mockKVStore = new MockKVStore(); + private final ExecutorService leaderStateTrigger = Executors.newSingleThreadExecutor(); + + public MockStoreEngine() { + super(new MockPlacementDriverClient()); + } + + @Override + public BatchRawKVStore getRawKVStore() { + return this.mockKVStore; + } + + @Override + public ExecutorService getLeaderStateTrigger() { + return this.leaderStateTrigger; + } + } + + static class MockPlacementDriverClient extends FakePlacementDriverClient { + + public MockPlacementDriverClient() { + super(1, "st_test"); + } + } +} From b65cf85a452b44b3fce547f80fec8b2c13892e13 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Tue, 30 Apr 2019 12:35:15 +0800 Subject: [PATCH 12/16] (fix) fix unit test --- .../java/com/alipay/sofa/jraft/core/IteratorImplTest.java | 4 ++-- .../test/java/com/alipay/sofa/jraft/core/IteratorTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java index a61a2d9a6..2de7e32e4 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorImplTest.java @@ -113,7 +113,7 @@ public void testRunTheRestClosureWithError() throws Exception { final Status s = mc.s; Assert.assertEquals(RaftError.ESTATEMACHINE.getNumber(), s.getCode()); assertEquals( - "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", + "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", s.getErrorMsg()); } } @@ -129,7 +129,7 @@ public void testSetErrorAndRollback() { Assert.assertEquals(RaftError.ESTATEMACHINE.getNumber(), iter.getError().getStatus().getCode()); Assert .assertEquals( - "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", + "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", iter.getError().getStatus().getErrorMsg()); assertEquals(6, iter.getIndex()); } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorTest.java index d4061cacb..a7221931b 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/IteratorTest.java @@ -107,7 +107,7 @@ public void testSetErrorAndRollback() { Assert.assertEquals(RaftError.ESTATEMACHINE.getNumber(), iterImpl.getError().getStatus().getCode()); Assert .assertEquals( - "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", + "StateMachine meet critical error when applying one or more tasks since index=6, Status[UNKNOWN<-1>: test]", iterImpl.getError().getStatus().getErrorMsg()); assertEquals(6, iter.getIndex()); } From 6185e18f95c18ee00b542c4ccd7d1bdb50544eba Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Tue, 30 Apr 2019 13:18:43 +0800 Subject: [PATCH 13/16] (fix) fix unit test --- .../sofa/jraft/rhea/storage/KVStateMachineTest.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java index 7da01fba2..6e54036e6 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.Executors; import org.apache.commons.io.FileUtils; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -78,7 +79,9 @@ public void setup() throws IOException, InterruptedException { final String raftDataPath = "raft_st_test"; this.raftDataPath = new File(raftDataPath); - FileUtils.forceDelete(this.raftDataPath); + if (this.raftDataPath.exists()) { + FileUtils.forceDelete(this.raftDataPath); + } FileUtils.forceMkdir(this.raftDataPath); final Path logUri = Paths.get(raftDataPath, "log"); @@ -107,7 +110,7 @@ public void setup() throws IOException, InterruptedException { this.raftRawKVStore = new RaftRawKVStore(node, rawKVStore, null); } - @Test + @After public void tearDown() throws IOException { if (this.raftGroupService != null) { this.raftGroupService.shutdown(); @@ -117,7 +120,9 @@ public void tearDown() throws IOException { ThrowUtil.throwException(e); } } - FileUtils.forceDelete(this.raftDataPath); + if (this.raftDataPath.exists()) { + FileUtils.forceDelete(this.raftDataPath); + } } @Test From f2ed77a1cbce93439f8fe16a23de077cd4fb8076 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Sun, 5 May 2019 13:04:16 +0800 Subject: [PATCH 14/16] (fix) fix unit test --- .../com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java index 6e54036e6..805a973cc 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java @@ -188,7 +188,7 @@ public void run(Status status) { break; } assertFalse(st.isOk()); - assertEquals(RaftError.ESTATEMACHINE, st.getRaftError()); + assertTrue(st.getRaftError() == RaftError.ESTATEMACHINE || st.getRaftError() == RaftError.EPERM); } } From 6b5683a7d7b6d2a40b3c7be788572c026bbcebe8 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Tue, 7 May 2019 21:56:31 +0800 Subject: [PATCH 15/16] (fix) fix unit test --- .../rhea/storage/KVStateMachineTest.java | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java index 805a973cc..4f90a8d0a 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/KVStateMachineTest.java @@ -204,7 +204,7 @@ public void put(byte[] key, byte[] value, KVStoreClosure closure) { closure.run(Status.OK()); } } else { - throw new RuntimeException("fail test"); + throw new RuntimeException("fail put test"); } } @@ -216,7 +216,28 @@ public void merge(byte[] key, byte[] value, KVStoreClosure closure) { closure.run(Status.OK()); } } else { - throw new RuntimeException("fail test"); + throw new RuntimeException("fail merge test"); + } + } + + @Override + void doSnapshotSave(MemoryKVStoreSnapshotFile snapshotFile, String snapshotPath, Region region) + throws Exception { + super.doSnapshotSave(snapshotFile, snapshotPath, region); + snapshotFile.writeToFile(snapshotPath, "putIndex", new PutIndex(this.putIndex)); + } + + @Override + void doSnapshotLoad(MemoryKVStoreSnapshotFile snapshotFile, String snapshotPath) throws Exception { + super.doSnapshotLoad(snapshotFile, snapshotPath); + final PutIndex p = snapshotFile.readFromFile(snapshotPath, "putIndex", PutIndex.class); + this.putIndex = p.data(); + } + + class PutIndex extends MemoryKVStoreSnapshotFile.Persistence { + + public PutIndex(Integer data) { + super(data); } } } From b68bc4a2d1741293eff551957fb443824f7def63 Mon Sep 17 00:00:00 2001 From: "jiachun.fjc" Date: Wed, 8 May 2019 14:48:26 +0800 Subject: [PATCH 16/16] (fix) by code review comments --- .../sofa/jraft/rhea/client/DefaultRheaKVCliService.java | 2 +- .../alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java index fa04c99fd..ee8a76619 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVCliService.java @@ -84,7 +84,7 @@ public Status rangeSplit(final long regionId, final long newRegionId, final Stri if (response.isSuccess()) { return Status.OK(); } - return new Status(-1, "Fail to range split on region %d, error: %s", regionId, String.valueOf(response)); + return new Status(-1, "Fail to range split on region %d, error: %s", regionId, response); } catch (final Exception e) { LOG.error("Fail to range split on exception: {}.", StackTraceUtil.stackTrace(e)); return new Status(-1, "fail to range split on region %d", regionId); diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java index c145eff8e..339327535 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java @@ -40,6 +40,7 @@ import com.alipay.sofa.jraft.rhea.serialization.Serializers; import com.alipay.sofa.jraft.rhea.util.Pair; import com.alipay.sofa.jraft.rhea.util.RecycleUtil; +import com.alipay.sofa.jraft.rhea.util.StackTraceUtil; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.alipay.sofa.jraft.util.BytesUtil; @@ -117,8 +118,9 @@ public void onApply(final Iterator it) { applied += batchApplyAndRecycle(first.getOpByte(), kvStates); } } catch (final Throwable t) { - it.setErrorAndRollback(index - applied, new Status(RaftError.EIO, "StateMachine meet critical error: %s.", - t.getMessage())); + LOG.error("StateMachine meet critical error: {}.", StackTraceUtil.stackTrace(t)); + it.setErrorAndRollback(index - applied, new Status(RaftError.ESTATEMACHINE, + "StateMachine meet critical error: %s.", t.getMessage())); } finally { // metrics: qps this.applyMeter.mark(applied);