diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/rheakv/ReverseScanExample.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/rheakv/ReverseScanExample.java new file mode 100644 index 000000000..6f8e0a44c --- /dev/null +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/rheakv/ReverseScanExample.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.jraft.example.rheakv; + +import com.alipay.sofa.jraft.rhea.client.RheaKVStore; +import com.alipay.sofa.jraft.rhea.storage.KVEntry; +import com.alipay.sofa.jraft.rhea.util.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8; +import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8; + +/** + * + * @author baozi + */ +public class ReverseScanExample { + + private static final Logger LOG = LoggerFactory.getLogger(ReverseScanExample.class); + + public static void main(final String[] args) throws Exception { + final Client client = new Client(); + client.init(); + scan(client.getRheaKVStore()); + client.shutdown(); + } + + @SuppressWarnings("unchecked") + public static void scan(final RheaKVStore rheaKVStore) { + final List keys = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + final byte[] bytes = writeUtf8("scan_demo_" + i); + keys.add(bytes); + rheaKVStore.bPut(bytes, bytes); + } + + final byte[] firstKey = keys.get(keys.size() - 1); + final byte[] lastKey = keys.get(0); + final String firstKeyString = readUtf8(firstKey); + final String lastKeyString = readUtf8(lastKey); + + // async scan + final CompletableFuture> f1 = rheaKVStore.reverseScan(firstKey, lastKey); + final CompletableFuture> f2 = rheaKVStore.reverseScan(firstKey, lastKey, false); + final CompletableFuture> f3 = rheaKVStore.reverseScan(firstKeyString, lastKeyString); + final CompletableFuture> f4 = rheaKVStore.reverseScan(firstKeyString, lastKeyString, false); + CompletableFuture.allOf(f1, f2, f3, f4).join(); + for (final CompletableFuture> f : new CompletableFuture[] { f1, f2, f3, f4 }) { + for (final KVEntry kv : f.join()) { + LOG.info("Async reverseScan: key={}, value={}", readUtf8(kv.getKey()), readUtf8(kv.getValue())); + } + } + + // sync scan + final List l1 = rheaKVStore.bReverseScan(firstKey, lastKey); + final List l2 = rheaKVStore.bReverseScan(firstKey, lastKey, false); + final List l3 = rheaKVStore.bReverseScan(firstKeyString, lastKeyString); + final List l4 = rheaKVStore.bReverseScan(firstKeyString, lastKeyString, false); + for (final List l : new List[] { l1, l2, l3, l4 }) { + for (final KVEntry kv : l) { + LOG.info("sync reverseScan: key={}, value={}", readUtf8(kv.getKey()), readUtf8(kv.getValue())); + } + } + } +} diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/rheakv/ScanExample.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/rheakv/ScanExample.java index 2531eff91..a169da99f 100644 --- a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/rheakv/ScanExample.java +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/rheakv/ScanExample.java @@ -77,7 +77,7 @@ public static void scan(final RheaKVStore rheaKVStore) { final List l4 = rheaKVStore.bScan(firstKeyString, lastKeyString, false); for (final List l : new List[] { l1, l2, l3, l4 }) { for (final KVEntry kv : l) { - LOG.info("Async scan: key={}, value={}", readUtf8(kv.getKey()), readUtf8(kv.getValue())); + LOG.info("sync scan: key={}, value={}", readUtf8(kv.getKey()), readUtf8(kv.getValue())); } } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java index ce8ffcad3..fa095eba8 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/DefaultRegionKVService.java @@ -455,20 +455,26 @@ public void handleScanRequest(final ScanRequest request, response.setRegionEpoch(getRegionEpoch()); try { KVParameterRequires.requireSameEpoch(request, getRegionEpoch()); - this.rawKVStore.scan(request.getStartKey(), request.getEndKey(), request.getLimit(), - request.isReadOnlySafe(), request.isReturnValue(), new BaseKVStoreClosure() { - - @SuppressWarnings("unchecked") - @Override - public void run(final Status status) { - if (status.isOk()) { - response.setValue((List) getData()); - } else { - setFailure(request, response, status, getError()); - } - closure.sendResponse(response); + BaseKVStoreClosure KVStoreClosure = new BaseKVStoreClosure() { + + @SuppressWarnings("unchecked") + @Override + public void run(final Status status) { + if (status.isOk()) { + response.setValue((List) getData()); + } else { + setFailure(request, response, status, getError()); } - }); + closure.sendResponse(response); + } + }; + if (request.isReverse()) { + this.rawKVStore.reverseScan(request.getStartKey(), request.getEndKey(), request.getLimit(), + request.isReadOnlySafe(), request.isReturnValue(), KVStoreClosure); + } else { + this.rawKVStore.scan(request.getStartKey(), request.getEndKey(), request.getLimit(), + request.isReadOnlySafe(), request.isReturnValue(), KVStoreClosure); + } } catch (final Throwable t) { LOG.error("Failed to handle: {}, {}.", request, StackTraceUtil.stackTrace(t)); response.setError(Errors.forException(t)); diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVStore.java index a7b2a00a0..259952f2c 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/DefaultRheaKVStore.java @@ -16,6 +16,7 @@ */ package com.alipay.sofa.jraft.rhea.client; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -610,8 +611,7 @@ private FutureGroup> internalScan(final byte[] startKey, final byt final boolean readOnlySafe, final boolean returnValue, final int retriesLeft, final Throwable lastCause) { Requires.requireNonNull(startKey, "startKey"); - final List regionList = this.pdClient - .findRegionsByKeyRange(startKey, endKey, ApiExceptionHelper.isInvalidEpoch(lastCause)); + final List regionList = this.pdClient.findRegionsByKeyRange(startKey, endKey, ApiExceptionHelper.isInvalidEpoch(lastCause)); final List>> futures = Lists.newArrayListWithCapacity(regionList.size()); final Errors lastError = lastCause == null ? null : Errors.forException(lastCause); for (final Region region : regionList) { @@ -623,7 +623,7 @@ private FutureGroup> internalScan(final byte[] startKey, final byt final ListRetryCallable retryCallable = retryCause -> internalScan(subStartKey, subEndKey, readOnlySafe, returnValue, retriesLeft - 1, retryCause); final ListFailoverFuture future = new ListFailoverFuture<>(retriesLeft, retryCallable); - internalRegionScan(region, subStartKey, subEndKey, readOnlySafe, returnValue, future, retriesLeft, lastError, + internalRegionScan(region, subStartKey, subEndKey, false, readOnlySafe, returnValue, future, retriesLeft, lastError, this.onlyLeaderRead); futures.add(future); } @@ -631,23 +631,32 @@ private FutureGroup> internalScan(final byte[] startKey, final byt } private void internalRegionScan(final Region region, final byte[] subStartKey, final byte[] subEndKey, - final boolean readOnlySafe, final boolean returnValue, + final boolean reverse, final boolean readOnlySafe, final boolean returnValue, final CompletableFuture> future, final int retriesLeft, final Errors lastCause, final boolean requireLeader) { final RegionEngine regionEngine = getRegionEngine(region.getId(), requireLeader); // require leader on retry - final RetryRunner retryRunner = retryCause -> internalRegionScan(region, subStartKey, subEndKey, readOnlySafe, + final RetryRunner retryRunner = retryCause -> internalRegionScan(region, subStartKey, subEndKey, reverse, readOnlySafe, returnValue, future, retriesLeft - 1, retryCause, true); final FailoverClosure> closure = new FailoverClosureImpl<>(future, false, retriesLeft, retryRunner); if (regionEngine != null) { if (ensureOnValidEpoch(region, regionEngine, closure)) { final RawKVStore rawKVStore = getRawKVStore(regionEngine); - if (this.kvDispatcher == null) { - rawKVStore.scan(subStartKey, subEndKey, readOnlySafe, returnValue, closure); + if (reverse) { + if (this.kvDispatcher == null) { + rawKVStore.reverseScan(subStartKey, subEndKey, readOnlySafe, returnValue, closure); + } else { + this.kvDispatcher.execute( + () -> rawKVStore.reverseScan(subStartKey, subEndKey, readOnlySafe, returnValue, closure)); + } } else { - this.kvDispatcher.execute( - () -> rawKVStore.scan(subStartKey, subEndKey, readOnlySafe, returnValue, closure)); + if (this.kvDispatcher == null) { + rawKVStore.scan(subStartKey, subEndKey, readOnlySafe, returnValue, closure); + } else { + this.kvDispatcher.execute( + () -> rawKVStore.scan(subStartKey, subEndKey, readOnlySafe, returnValue, closure)); + } } } } else { @@ -658,10 +667,107 @@ private void internalRegionScan(final Region region, final byte[] subStartKey, f request.setReturnValue(returnValue); request.setRegionId(region.getId()); request.setRegionEpoch(region.getRegionEpoch()); + request.setReverse(reverse); this.rheaKVRpcService.callAsyncWithRpc(request, closure, lastCause, requireLeader); } } + @Override + public CompletableFuture> reverseScan(final byte[] startKey, final byte[] endKey) { + return reverseScan(startKey, endKey, true); + } + + @Override + public CompletableFuture> reverseScan(final String startKey, final String endKey) { + return reverseScan(BytesUtil.writeUtf8(startKey), BytesUtil.writeUtf8(endKey)); + } + + @Override + public CompletableFuture> reverseScan(final byte[] startKey, final byte[] endKey, + final boolean readOnlySafe) { + return reverseScan(startKey, endKey, readOnlySafe, true); + } + + @Override + public CompletableFuture> reverseScan(final String startKey, final String endKey, + final boolean readOnlySafe) { + return reverseScan(BytesUtil.writeUtf8(startKey), BytesUtil.writeUtf8(endKey), readOnlySafe); + } + + @Override + public CompletableFuture> reverseScan(final byte[] startKey, final byte[] endKey, + final boolean readOnlySafe, final boolean returnValue) { + checkState(); + final byte[] realEndKey = BytesUtil.nullToEmpty(endKey); + if (startKey != null) { + Requires.requireTrue(BytesUtil.compare(startKey, realEndKey) > 0, "startKey must > endKey"); + } + final FutureGroup> futureGroup = internalReverseScan(startKey, realEndKey, readOnlySafe, + returnValue, this.failoverRetries, null); + return FutureHelper.joinList(futureGroup); + } + + @Override + public CompletableFuture> reverseScan(final String startKey, final String endKey, + final boolean readOnlySafe, final boolean returnValue) { + return reverseScan(BytesUtil.writeUtf8(startKey), BytesUtil.writeUtf8(endKey), readOnlySafe, returnValue); + } + + @Override + public List bReverseScan(final byte[] startKey, final byte[] endKey) { + return FutureHelper.get(reverseScan(startKey, endKey), this.futureTimeoutMillis); + } + + @Override + public List bReverseScan(final String startKey, final String endKey) { + return FutureHelper.get(reverseScan(startKey, endKey), this.futureTimeoutMillis); + } + + @Override + public List bReverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe) { + return FutureHelper.get(reverseScan(startKey, endKey, readOnlySafe), this.futureTimeoutMillis); + } + + @Override + public List bReverseScan(final String startKey, final String endKey, final boolean readOnlySafe) { + return FutureHelper.get(reverseScan(startKey, endKey, readOnlySafe), this.futureTimeoutMillis); + } + + @Override + public List bReverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe, + final boolean returnValue) { + return FutureHelper.get(reverseScan(startKey, endKey, readOnlySafe, returnValue), this.futureTimeoutMillis); + } + + @Override + public List bReverseScan(final String startKey, final String endKey, final boolean readOnlySafe, + final boolean returnValue) { + return FutureHelper.get(reverseScan(startKey, endKey, readOnlySafe, returnValue), this.futureTimeoutMillis); + } + + private FutureGroup> internalReverseScan(final byte[] startKey, final byte[] endKey, + final boolean readOnlySafe, final boolean returnValue, + final int retriesLeft, final Throwable lastCause) { + Requires.requireNonNull(endKey, "endKey"); + final List regionList = this.pdClient.findRegionsByKeyRange(endKey, startKey, ApiExceptionHelper.isInvalidEpoch(lastCause)); + Collections.reverse(regionList); + final List>> futures = Lists.newArrayListWithCapacity(regionList.size()); + final Errors lastError = lastCause == null ? null : Errors.forException(lastCause); + for (final Region region : regionList) { + final byte[] regionEndKey = region.getEndKey(); + final byte[] regionStartKey = region.getStartKey(); + final byte[] subStartKey = regionEndKey == null ? startKey : (startKey == null ? regionEndKey : BytesUtil.min(regionEndKey, startKey)); + final byte[] subEndKey = regionStartKey == null ? endKey : BytesUtil.max(regionStartKey, endKey); + final ListRetryCallable retryCallable = retryCause -> internalReverseScan(subStartKey, subEndKey, + readOnlySafe, returnValue, retriesLeft - 1, retryCause); + final ListFailoverFuture future = new ListFailoverFuture<>(retriesLeft, retryCallable); + internalRegionScan(region, subStartKey, subEndKey, true, readOnlySafe, returnValue, future, retriesLeft, lastError, + this.onlyLeaderRead ); + futures.add(future); + } + return new FutureGroup<>(futures); + } + public List singleRegionScan(final byte[] startKey, final byte[] endKey, final int limit, final boolean readOnlySafe, final boolean returnValue) { checkState(); diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/RheaKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/RheaKVStore.java index e4387f37c..8726f463f 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/RheaKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/client/RheaKVStore.java @@ -254,6 +254,86 @@ List bScan(final byte[] startKey, final byte[] endKey, final boolean re List bScan(final String startKey, final String endKey, final boolean readOnlySafe, final boolean returnValue); + /** + * Equivalent to {@code reverseScan(startKey, endKey, true)}. + */ + CompletableFuture> reverseScan(final byte[] startKey, final byte[] endKey); + + /** + * @see #reverseScan(byte[], byte[]) + */ + CompletableFuture> reverseScan(final String startKey, final String endKey); + + /** + * Equivalent to {@code reverseScan(startKey, endKey, readOnlySafe, true)}. + */ + CompletableFuture> reverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe); + + /** + * @see #reverseScan(byte[], byte[], boolean) + */ + CompletableFuture> reverseScan(final String startKey, final String endKey, final boolean readOnlySafe); + + /** + * Reverse query all data in the key of range [startKey, endKey). + *

+ * Provide consistent reading if {@code readOnlySafe} is true. + * + * Reverse scanning is usually much worse than forward scanning. + * + * Reverse scanning across multi regions maybe slower and devastating. + * + * @param startKey first key to reverse scan within database (included), + * null means 'max-key' in the database. + * @param endKey last key to reverse scan within database (excluded). + * null means 'min-key' in the database. + * @param readOnlySafe provide consistent reading if {@code readOnlySafe} + * is true. + * @param returnValue whether to return value. + * @return a list where the key of range [startKey, endKey) passed by user + * and value for {@code KVEntry} + */ + CompletableFuture> reverseScan(final byte[] startKey, final byte[] endKey, + final boolean readOnlySafe, final boolean returnValue); + + /** + * @see #reverseScan(byte[], byte[], boolean, boolean) + */ + CompletableFuture> reverseScan(final String startKey, final String endKey, + final boolean readOnlySafe, final boolean returnValue); + + /** + * @see #reverseScan(byte[], byte[]) + */ + List bReverseScan(final byte[] startKey, final byte[] endKey); + + /** + * @see #scan(String, String) + */ + List bReverseScan(final String startKey, final String endKey); + + /** + * @see #scan(String, String, boolean) + */ + List bReverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe); + + /** + * @see #scan(String, String, boolean) + */ + List bReverseScan(final String startKey, final String endKey, final boolean readOnlySafe); + + /** + * @see #reverseScan(String, String, boolean, boolean) + */ + List bReverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe, + final boolean returnValue); + + /** + * @see #reverseScan(String, String, boolean, boolean) + */ + List bReverseScan(final String startKey, final String endKey, final boolean readOnlySafe, + final boolean returnValue); + /** * Equivalent to {@code iterator(startKey, endKey, bufSize, true)}. */ diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/ScanRequest.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/ScanRequest.java index a9d88dd89..0e058e678 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/ScanRequest.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/cmd/store/ScanRequest.java @@ -36,6 +36,15 @@ public class ScanRequest extends BaseRequest { private int limit; private boolean readOnlySafe = true; private boolean returnValue = true; + private boolean reverse = false; + + public boolean isReverse() { + return reverse; + } + + public void setReverse(boolean reverse) { + this.reverse = reverse; + } public byte[] getStartKey() { return startKey; @@ -85,7 +94,7 @@ public byte magic() { @Override public String toString() { return "ScanRequest{" + "startKey=" + BytesUtil.toHex(startKey) + ", endKey=" + BytesUtil.toHex(endKey) - + ", limit=" + limit + ", readOnlySafe=" + readOnlySafe + ", returnValue=" + returnValue + "} " - + super.toString(); + + ", limit=" + limit + ", reverse=" + reverse + ", readOnlySafe=" + readOnlySafe + ", returnValue=" + + returnValue + "} " + super.toString(); } } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java index 194b4615a..d8d2a0699 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BaseRawKVStore.java @@ -75,6 +75,34 @@ public void scan(final byte[] startKey, final byte[] endKey, final int limit, fi scan(startKey, endKey, limit, readOnlySafe, true, closure); } + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final KVStoreClosure closure) { + reverseScan(startKey, endKey, Integer.MAX_VALUE, closure); + } + + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe, + final KVStoreClosure closure) { + reverseScan(startKey, endKey, Integer.MAX_VALUE, readOnlySafe, closure); + } + + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe, + final boolean returnValue, final KVStoreClosure closure) { + reverseScan(startKey, endKey, Integer.MAX_VALUE, readOnlySafe, returnValue, closure); + } + + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final int limit, final KVStoreClosure closure) { + reverseScan(startKey, endKey, limit, true, closure); + } + + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final int limit, final boolean readOnlySafe, + final KVStoreClosure closure) { + reverseScan(startKey, endKey, limit, readOnlySafe, true, closure); + } + @Override public void execute(final NodeExecutor nodeExecutor, final boolean isLeader, final KVStoreClosure closure) { final Timer.Context timeCtx = getTimeContext("EXECUTE"); @@ -99,6 +127,19 @@ public long getSafeEndValueForSequence(final long startVal, final int step) { return Math.max(startVal, Long.MAX_VALUE - step < startVal ? Long.MAX_VALUE : startVal + step); } + /** + * If limit == 0, it will be modified to Integer.MAX_VALUE on the server + * and then queried. So 'limit == 0' means that the number of queries is + * not limited. This is because serialization uses varint to compress + * numbers. In the case of 0, only 1 byte is occupied, and Integer.MAX_VALUE + * takes 5 bytes. + * @param limit + * @return + */ + protected int normalizeLimit(final int limit) { + return limit > 0 ? limit : Integer.MAX_VALUE; + } + /** * Note: This is not a very precise behavior, don't rely on its accuracy. */ diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawKVStore.java index bee7d513f..87d57c140 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/BatchRawKVStore.java @@ -134,6 +134,14 @@ public void batchScan(final KVStateOutputList kvStates) { } } + public void batchReverseScan(final KVStateOutputList kvStates) { + for (int i = 0, l = kvStates.size(); i < l; i++) { + final KVState kvState = kvStates.get(i); + final KVOperation op = kvState.getOp(); + reverseScan(op.getStartKey(), op.getEndKey(), op.getLimit(), true, op.isReturnValue(), kvState.getDone()); + } + } + public void batchGetAndPut(final KVStateOutputList kvStates) { for (int i = 0, l = kvStates.size(); i < l; i++) { final KVState kvState = kvStates.get(i); diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVOperation.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVOperation.java index fbc2f63f9..5751ca6cc 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVOperation.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVOperation.java @@ -77,12 +77,15 @@ public class KVOperation implements Serializable { /** Contains key operation */ public static final byte CONTAINS_KEY = 0x13; - public static final byte EOF = 0x14; + /** Reverse Scan operation */ + public static final byte REVERSE_SCAN = 0x14; + + public static final byte EOF = 0x15; private static final byte[] VALID_OPS; static { - VALID_OPS = new byte[19]; + VALID_OPS = new byte[EOF - 1]; VALID_OPS[0] = PUT; VALID_OPS[1] = PUT_IF_ABSENT; VALID_OPS[2] = DELETE; @@ -102,6 +105,7 @@ public class KVOperation implements Serializable { VALID_OPS[16] = COMPARE_PUT; VALID_OPS[17] = DELETE_LIST; VALID_OPS[18] = CONTAINS_KEY; + VALID_OPS[19] = REVERSE_SCAN; } private byte[] key; // also startKey for DELETE_RANGE @@ -201,6 +205,11 @@ public static KVOperation createScan(final byte[] startKey, final byte[] endKey, return new KVOperation(startKey, endKey, Pair.of(limit, returnValue), SCAN); } + public static KVOperation createReverseScan(final byte[] startKey, final byte[] endKey, final int limit, + final boolean returnValue) { + return new KVOperation(startKey, endKey, Pair.of(limit, returnValue), REVERSE_SCAN); + } + public static KVOperation createGetAndPut(final byte[] key, final byte[] value) { Requires.requireNonNull(key, "key"); Requires.requireNonNull(value, "value"); @@ -374,6 +383,8 @@ public static String opName(byte op) { return "RESET_SEQUENCE"; case RANGE_SPLIT: return "RANGE_SPLIT"; + case REVERSE_SCAN: + return "REVERSE_SCAN"; default: return "UNKNOWN" + op; } diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java index 481a0dd25..9e38a98c4 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/KVStoreStateMachine.java @@ -197,6 +197,9 @@ private void batchApply(final byte opType, final KVStateOutputList kvStates) { case KVOperation.SCAN: this.rawKVStore.batchScan(kvStates); break; + case KVOperation.REVERSE_SCAN: + this.rawKVStore.batchReverseScan(kvStates); + break; case KVOperation.GET_PUT: this.rawKVStore.batchGetAndPut(kvStates); break; diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java index 30e5940ed..d1597e229 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MemoryRawKVStore.java @@ -143,12 +143,7 @@ public void scan(final byte[] startKey, final byte[] endKey, final int limit, final KVStoreClosure closure) { final Timer.Context timeCtx = getTimeContext("SCAN"); final List entries = Lists.newArrayList(); - // If limit == 0, it will be modified to Integer.MAX_VALUE on the server - // and then queried. So 'limit == 0' means that the number of queries is - // not limited. This is because serialization uses varint to compress - // numbers. In the case of 0, only 1 byte is occupied, and Integer.MAX_VALUE - // takes 5 bytes. - final int maxCount = limit > 0 ? limit : Integer.MAX_VALUE; + int maxCount = normalizeLimit(limit); final ConcurrentNavigableMap subMap; final byte[] realStartKey = BytesUtil.nullToEmpty(startKey); if (endKey == null) { @@ -173,6 +168,37 @@ public void scan(final byte[] startKey, final byte[] endKey, final int limit, } } + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final int limit, + @SuppressWarnings("unused") final boolean readOnlySafe, final boolean returnValue, + final KVStoreClosure closure) { + final Timer.Context timeCtx = getTimeContext("REVERSE_SCAN"); + final List entries = Lists.newArrayList(); + int maxCount = normalizeLimit(limit); + final ConcurrentNavigableMap subMap; + final byte[] realEndKey = BytesUtil.nullToEmpty(endKey); + if (startKey == null) { + subMap = this.defaultDB.descendingMap().headMap(realEndKey); + } else { + subMap = this.defaultDB.descendingMap().subMap(startKey, realEndKey); + } + try { + for (final Map.Entry entry : subMap.entrySet()) { + entries.add(new KVEntry(entry.getKey(), returnValue ? entry.getValue() : null)); + if (entries.size() >= maxCount) { + break; + } + } + setSuccess(closure, entries); + } catch (final Exception e) { + LOG.error("Fail to [REVERSE_SCAN], range: ['[{}, {})'], {}.", BytesUtil.toHex(startKey), + BytesUtil.toHex(endKey), StackTraceUtil.stackTrace(e)); + setFailure(closure, "Fail to [REVERSE_SCAN]"); + } finally { + timeCtx.stop(); + } + } + @Override public void getSequence(final byte[] seqKey, final int step, final KVStoreClosure closure) { final Timer.Context timeCtx = getTimeContext("GET_SEQUENCE"); diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MetricsRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MetricsRawKVStore.java index 5cd8a14bb..cb2248723 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MetricsRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/MetricsRawKVStore.java @@ -40,6 +40,7 @@ import static com.alipay.sofa.jraft.rhea.storage.KVOperation.PUT_IF_ABSENT; import static com.alipay.sofa.jraft.rhea.storage.KVOperation.PUT_LIST; import static com.alipay.sofa.jraft.rhea.storage.KVOperation.RESET_SEQUENCE; +import static com.alipay.sofa.jraft.rhea.storage.KVOperation.REVERSE_SCAN; import static com.alipay.sofa.jraft.rhea.storage.KVOperation.SCAN; /** @@ -126,6 +127,41 @@ public void scan(final byte[] startKey, final byte[] endKey, final int limit, fi this.rawKVStore.scan(startKey, endKey, limit, readOnlySafe, returnValue, c); } + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final KVStoreClosure closure) { + reverseScan(startKey, endKey, Integer.MAX_VALUE, closure); + } + + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe, + final KVStoreClosure closure) { + reverseScan(startKey, endKey, Integer.MAX_VALUE, readOnlySafe, closure); + } + + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe, + final boolean returnValue, final KVStoreClosure closure) { + reverseScan(startKey, endKey, Integer.MAX_VALUE, readOnlySafe, returnValue, closure); + } + + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final int limit, final KVStoreClosure closure) { + reverseScan(startKey, endKey, limit, true, closure); + } + + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final int limit, final boolean readOnlySafe, + final KVStoreClosure closure) { + reverseScan(startKey, endKey, limit, readOnlySafe, true, closure); + } + + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final int limit, final boolean readOnlySafe, + final boolean returnValue, final KVStoreClosure closure) { + final KVStoreClosure c = metricsAdapter(closure, REVERSE_SCAN, 0, 0); + this.rawKVStore.reverseScan(startKey, endKey, limit, readOnlySafe, returnValue, c); + } + @Override public void getSequence(final byte[] seqKey, final int step, final KVStoreClosure closure) { final KVStoreClosure c = metricsAdapter(closure, GET_SEQUENCE, 1, 8); diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RaftRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RaftRawKVStore.java index d56e3b111..5e5c28bd1 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RaftRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RaftRawKVStore.java @@ -210,6 +210,64 @@ public void run(final Status status, final long index, final byte[] reqCtx) { }); } + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final KVStoreClosure closure) { + reverseScan(startKey, endKey, Integer.MAX_VALUE, closure); + } + + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe, + final KVStoreClosure closure) { + reverseScan(startKey, endKey, Integer.MAX_VALUE, readOnlySafe, closure); + } + + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe, + final boolean returnValue, final KVStoreClosure closure) { + reverseScan(startKey, endKey, Integer.MAX_VALUE, readOnlySafe, returnValue, closure); + } + + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final int limit, final KVStoreClosure closure) { + reverseScan(startKey, endKey, limit, true, closure); + } + + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final int limit, final boolean readOnlySafe, + final KVStoreClosure closure) { + reverseScan(startKey, endKey, limit, readOnlySafe, true, closure); + } + + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final int limit, final boolean readOnlySafe, + final boolean returnValue, final KVStoreClosure closure) { + if (!readOnlySafe) { + this.kvStore.reverseScan(startKey, endKey, limit, false, returnValue, closure); + return; + } + this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { + + @Override + public void run(final Status status, final long index, final byte[] reqCtx) { + if (status.isOk()) { + RaftRawKVStore.this.kvStore.reverseScan(startKey, endKey, limit, true, returnValue, closure); + return; + } + RaftRawKVStore.this.readIndexExecutor.execute(() -> { + if (isLeader()) { + LOG.warn("Fail to [reverse scan] with 'ReadIndex': {}, try to applying to the state machine.", status); + // If 'read index' read fails, try to applying to the state machine at the leader node + applyOperation(KVOperation.createReverseScan(startKey, endKey, limit, returnValue), closure); + } else { + LOG.warn("Fail to [reverse scan] with 'ReadIndex': {}.", status); + // Client will retry to leader node + new KVClosureAdapter(closure, null).run(status); + } + }); + } + }); + } + @Override public void getSequence(final byte[] seqKey, final int step, final KVStoreClosure closure) { if (step > 0) { diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RawKVStore.java index dfee3559d..b0e7b5878 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RawKVStore.java @@ -118,6 +118,45 @@ void scan(final byte[] startKey, final byte[] endKey, final int limit, final boo void scan(final byte[] startKey, final byte[] endKey, final int limit, final boolean readOnlySafe, final boolean returnValue, final KVStoreClosure closure); + /** + * Equivalent to {@code reverseScan(startKey, endKey, Integer.MAX_VALUE, closure)}. + */ + void reverseScan(final byte[] startKey, final byte[] endKey, final KVStoreClosure closure); + + /** + * Equivalent to {@code reverseScan(startKey, endKey, Integer.MAX_VALUE, readOnlySafe, closure)}. + */ + void reverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe, + final KVStoreClosure closure); + + /** + * Equivalent to {@code reverseScan(startKey, endKey, Integer.MAX_VALUE, readOnlySafe, returnValue, closure)}. + */ + void reverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe, final boolean returnValue, + final KVStoreClosure closure); + + /** + * Equivalent to {@code reverseScan(startKey, endKey, limit, true, closure)}. + */ + void reverseScan(final byte[] startKey, final byte[] endKey, final int limit, final KVStoreClosure closure); + + /** + * Equivalent to {@code reverseScan(startKey, endKey, limit, readOnlySafe, true, closure)}. + */ + void reverseScan(final byte[] startKey, final byte[] endKey, final int limit, final boolean readOnlySafe, + final KVStoreClosure closure); + + /** + * Reverse query all data in the key range of [startKey, endKey), + * {@code limit} is the max number of keys. + * + * Provide consistent reading if {@code readOnlySafe} is true. + * + * Only return keys(ignore values) if {@code returnValue} is false. + */ + void reverseScan(final byte[] startKey, final byte[] endKey, final int limit, final boolean readOnlySafe, + final boolean returnValue, final KVStoreClosure closure); + /** * Get a globally unique auto-increment sequence. * diff --git a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java index ae77944af..d313637d1 100644 --- a/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java +++ b/jraft-rheakv/rheakv-core/src/main/java/com/alipay/sofa/jraft/rhea/storage/RocksRawKVStore.java @@ -308,12 +308,7 @@ public void scan(final byte[] startKey, final byte[] endKey, final int limit, final KVStoreClosure closure) { final Timer.Context timeCtx = getTimeContext("SCAN"); final List entries = Lists.newArrayList(); - // If limit == 0, it will be modified to Integer.MAX_VALUE on the server - // and then queried. So 'limit == 0' means that the number of queries is - // not limited. This is because serialization uses varint to compress - // numbers. In the case of 0, only 1 byte is occupied, and Integer.MAX_VALUE - // takes 5 bytes. - final int maxCount = limit > 0 ? limit : Integer.MAX_VALUE; + int maxCount = normalizeLimit(limit); final Lock readLock = this.readWriteLock.readLock(); readLock.lock(); try (final RocksIterator it = this.db.newIterator()) { @@ -342,6 +337,41 @@ public void scan(final byte[] startKey, final byte[] endKey, final int limit, } } + @Override + public void reverseScan(final byte[] startKey, final byte[] endKey, final int limit, + @SuppressWarnings("unused") final boolean readOnlySafe, final boolean returnValue, + final KVStoreClosure closure) { + final Timer.Context timeCtx = getTimeContext("REVERSE_SCAN"); + final List entries = Lists.newArrayList(); + int maxCount = normalizeLimit(limit); + final Lock readLock = this.readWriteLock.readLock(); + readLock.lock(); + try (final RocksIterator it = this.db.newIterator()) { + if (startKey == null) { + it.seekToLast(); + } else { + it.seekForPrev(startKey); + } + int count = 0; + while (it.isValid() && count++ < maxCount) { + final byte[] key = it.key(); + if (endKey != null && BytesUtil.compare(key, endKey) <= 0) { + break; + } + entries.add(new KVEntry(key, returnValue ? it.value() : null)); + it.prev(); + } + setSuccess(closure, entries); + } catch (final Exception e) { + LOG.error("Fail to [REVERSE_SCAN], range: ['[{}, {})'], {}.", BytesUtil.toHex(startKey), + BytesUtil.toHex(endKey), StackTraceUtil.stackTrace(e)); + setFailure(closure, "Fail to [REVERSE_SCAN]"); + } finally { + readLock.unlock(); + timeCtx.stop(); + } + } + @Override public void getSequence(final byte[] seqKey, final int step, final KVStoreClosure closure) { final Timer.Context timeCtx = getTimeContext("GET_SEQUENCE"); diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/memorydb/MemoryKVStoreTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/memorydb/MemoryKVStoreTest.java index 907581af7..37dbb574d 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/memorydb/MemoryKVStoreTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/memorydb/MemoryKVStoreTest.java @@ -306,6 +306,56 @@ public void execute(RawKVStore kvStore, KVStoreClosure closure) { assertEquals(entries.size(), 20); } + /** + * Test method: {@link MemoryRawKVStore#reverseScan(byte[], byte[], KVStoreClosure)} + */ + @Test + public void reverseScanTest() { + final List keyList = Lists.newArrayList(); + final List valueList = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + byte[] key = makeKey("scan_test_key_" + i); + byte[] value = makeValue("scan_test_key_" + i); + keyList.add(key); + valueList.add(value); + this.kvStore.put(key, value, null); + } + for (int i = 0; i < 10; i++) { + byte[] key = makeKey("no_scan_test_key_" + i); + byte[] value = makeValue("no_scan_test_key_" + i); + this.kvStore.put(key, value, null); + } + + List entries = new SyncKVStore>() { + @Override + public void execute(RawKVStore kvStore, KVStoreClosure closure) { + kvStore.reverseScan(makeKey("scan_test_key_" + 99), makeKey("scan_test_key_"), closure); + } + }.apply(this.kvStore); + assertEquals(entries.size(), keyList.size()); + for (int i = keyList.size() - 1; i >= 0; i--) { + assertArrayEquals(keyList.get(i), entries.get(keyList.size() - 1 - i).getKey()); + assertArrayEquals(valueList.get(i), entries.get(keyList.size() - 1 - i).getValue()); + } + + entries = new SyncKVStore>() { + @Override + public void execute(RawKVStore kvStore, KVStoreClosure closure) { + kvStore.reverseScan(makeKey("scan_test_key_" + 99), null, closure); + } + }.apply(this.kvStore); + assertEquals(entries.size(), 20); + + entries = new SyncKVStore>() { + @Override + public void execute(RawKVStore kvStore, KVStoreClosure closure) { + kvStore.reverseScan(null, null, closure); + } + }.apply(this.kvStore); + assertEquals(entries.size(), 20); + + } + /** * Test method: {@link MemoryRawKVStore#getSequence(byte[], int, KVStoreClosure)} */ diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/AbstractRheaKVStoreTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/AbstractRheaKVStoreTest.java index 6f0d9aa56..270bdd318 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/AbstractRheaKVStoreTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rhea/AbstractRheaKVStoreTest.java @@ -232,6 +232,16 @@ public void containsKeyByFollowerTest() { containsKeyTest(getRandomFollowerStore()); } + @Test + public void scanByLeaderTest() { + scanTest(getRandomLeaderStore()); + } + + @Test + public void scanByFollowerTest() { + scanTest(getRandomFollowerStore()); + } + /** * Test method: {@link RheaKVStore#scan(byte[], byte[])} */ @@ -335,16 +345,133 @@ private void scanTest(RheaKVStore store) { entries = store.bScan("no_", null); assertEquals(entries.size(), 20); } + + { + List entries = store.bScan("z", null); + assertEquals(entries.size(), 0); + } } @Test - public void scanByLeaderTest() { - scanTest(getRandomLeaderStore()); + public void reverseScanByLeaderTest() { + reverseScanTest(getRandomLeaderStore()); } @Test - public void scanByFollowerTest() { - scanTest(getRandomFollowerStore()); + public void reverseScanByFollowerTest() { + reverseScanTest(getRandomFollowerStore()); + } + + /** + * Test method: {@link RheaKVStore#reverseScan(byte[], byte[])} + */ + private void reverseScanTest(RheaKVStore store) { + // regions: 1 -> [null, g), 2 -> [g, null) + List keyList = Lists.newArrayList(); + List valueList = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + byte[] key = makeKey("scan_test_key_" + i); + checkRegion(store, key, 2); + byte[] value = makeValue("scan_test_value_" + i); + keyList.add(key); + valueList.add(value); + store.bPut(key, value); + } + for (int i = 0; i < 10; i++) { + byte[] key = makeKey("no_scan_test_key_" + i); + checkRegion(store, key, 2); + byte[] value = makeValue("no_scan_test_value_" + i); + store.bPut(key, value); + } + + // bReverseScan(byte[], byte[]) + { + List entries = store.bReverseScan(makeKey("scan_test_key_" + 99), makeKey("scan_test_key_")); + assertEquals(entries.size(), keyList.size()); + for (int i = 0; i < keyList.size(); i++) { + assertArrayEquals(keyList.get(i), entries.get(keyList.size() - 1 - i).getKey()); + assertArrayEquals(valueList.get(i), entries.get(valueList.size() - 1 - i).getValue()); + } + } + + // bReverseScan(String, String) + { + List entries = store.bReverseScan("scan_test_key_" + 99, "scan_test_key_"); + assertEquals(entries.size(), keyList.size()); + for (int i = 0; i < keyList.size(); i++) { + assertArrayEquals(keyList.get(i), entries.get(keyList.size() - 1 - i).getKey()); + assertArrayEquals(valueList.get(i), entries.get(valueList.size() - 1 - i).getValue()); + } + } + + // bReverseScan(byte[], byte[], Boolean) + { + List entries = store.bReverseScan(makeKey("scan_test_key_" + 99), makeKey("scan_test_key_"), true); + assertEquals(entries.size(), keyList.size()); + for (int i = 0; i < keyList.size(); i++) { + assertArrayEquals(keyList.get(i), entries.get(keyList.size() - 1 - i).getKey()); + assertArrayEquals(valueList.get(i), entries.get(valueList.size() - 1 - i).getValue()); + } + } + + // bReverseScan(String, String, Boolean) + { + List entries = store.bReverseScan("scan_test_key_" + 99, "scan_test_key_", true); + assertEquals(entries.size(), keyList.size()); + for (int i = 0; i < keyList.size(); i++) { + assertArrayEquals(keyList.get(i), entries.get(keyList.size() - 1 - i).getKey()); + assertArrayEquals(valueList.get(i), entries.get(valueList.size() - 1 - i).getValue()); + } + } + + // bReverseScan(byte[], byte[], Boolean, Boolean) + { + List entries = store.bReverseScan(makeKey("scan_test_key_" + 99), makeKey("scan_test_key_"), true, + true); + assertEquals(entries.size(), keyList.size()); + for (int i = 0; i < keyList.size(); i++) { + assertArrayEquals(keyList.get(i), entries.get(keyList.size() - 1 - i).getKey()); + assertArrayEquals(valueList.get(i), entries.get(valueList.size() - 1 - i).getValue()); + } + + entries = store.bReverseScan(makeKey("scan_test_key_" + 99), makeKey("scan_test_key_"), true, false); + assertEquals(entries.size(), keyList.size()); + for (int i = 0; i < keyList.size(); i++) { + assertArrayEquals(keyList.get(i), entries.get(keyList.size() - 1 - i).getKey()); + assertNull(entries.get(i).getValue()); + } + } + + // bReverseScan(String, String, Boolean, Boolean) + { + List entries = store.bReverseScan("scan_test_key_" + 99, "scan_test_key_", true, true); + assertEquals(entries.size(), keyList.size()); + for (int i = 0; i < keyList.size(); i++) { + assertArrayEquals(keyList.get(i), entries.get(keyList.size() - 1 - i).getKey()); + assertArrayEquals(valueList.get(i), entries.get(valueList.size() - 1 - i).getValue()); + } + + entries = store.bReverseScan("scan_test_key_" + 99, "scan_test_key_", true, false); + assertEquals(entries.size(), keyList.size()); + for (int i = 0; i < keyList.size(); i++) { + assertArrayEquals(keyList.get(i), entries.get(keyList.size() - 1 - i).getKey()); + assertNull(entries.get(i).getValue()); + } + } + + { + List entries = store.bReverseScan(makeKey("no_scan_test_key_" + 99), null); + assertEquals(entries.size(), keyList.size()); + + entries = store.bReverseScan(null, "no_"); + assertEquals(entries.size(), 20); + + } + + { + List entries = store.bReverseScan(null, "z"); + assertEquals(entries.size(), 0); + } } /** diff --git a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rocksdb/RocksKVStoreTest.java b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rocksdb/RocksKVStoreTest.java index 8e38e2307..1f6fa782b 100644 --- a/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rocksdb/RocksKVStoreTest.java +++ b/jraft-rheakv/rheakv-core/src/test/java/com/alipay/sofa/jraft/rhea/storage/rocksdb/RocksKVStoreTest.java @@ -244,6 +244,55 @@ public void execute(RawKVStore kvStore, KVStoreClosure closure) { assertEquals(entries.size(), 20); } + /** + * Test method: {@link RocksRawKVStore#reverseScan(byte[], byte[], KVStoreClosure)} + */ + @Test + public void reverseScanTest() { + final List keyList = Lists.newArrayList(); + final List valueList = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + byte[] key = makeKey("scan_test_key_" + i); + byte[] value = makeValue("scan_test_value_" + i); + keyList.add(key); + valueList.add(value); + this.kvStore.put(key, value, null); + } + for (int i = 0; i < 10; i++) { + byte[] key = makeKey("no_scan_test_key_" + i); + byte[] value = makeValue("no_scan_test_value_" + i); + this.kvStore.put(key, value, null); + } + + List entries = new SyncKVStore>() { + @Override + public void execute(RawKVStore kvStore, KVStoreClosure closure) { + kvStore.reverseScan(makeKey("scan_test_key_" + 99), makeKey("scan_test_key_"), closure); + } + }.apply(this.kvStore); + assertEquals(entries.size(), keyList.size()); + for (int i = keyList.size() - 1; i >= 0; i--) { + assertArrayEquals(keyList.get(i), entries.get(keyList.size() - 1 - i).getKey()); + assertArrayEquals(valueList.get(i), entries.get(keyList.size() - 1 - i).getValue()); + } + + entries = new SyncKVStore>() { + @Override + public void execute(RawKVStore kvStore, KVStoreClosure closure) { + kvStore.reverseScan(makeKey("scan_test_key_" + 99), null, closure); + } + }.apply(this.kvStore); + assertEquals(entries.size(), 20); + + entries = new SyncKVStore>() { + @Override + public void execute(RawKVStore kvStore, KVStoreClosure closure) { + kvStore.reverseScan(null, null, closure); + } + }.apply(this.kvStore); + assertEquals(entries.size(), 20); + } + /** * Test method: {@link RocksRawKVStore#getSequence(byte[], int, KVStoreClosure)} */