Skip to content

Commit

Permalink
feat: rheakv support scan in reverse
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Apr 26, 2020
1 parent be54c3c commit 100d11d
Show file tree
Hide file tree
Showing 18 changed files with 805 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.example.rheakv;

import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
import com.alipay.sofa.jraft.rhea.storage.KVEntry;
import com.alipay.sofa.jraft.rhea.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;

import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;

/**
*
* @author baozi
*/
public class ReverseScanExample {

private static final Logger LOG = LoggerFactory.getLogger(ReverseScanExample.class);

public static void main(final String[] args) throws Exception {
final Client client = new Client();
client.init();
scan(client.getRheaKVStore());
client.shutdown();
}

@SuppressWarnings("unchecked")
public static void scan(final RheaKVStore rheaKVStore) {
final List<byte[]> keys = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
final byte[] bytes = writeUtf8("scan_demo_" + i);
keys.add(bytes);
rheaKVStore.bPut(bytes, bytes);
}

final byte[] firstKey = keys.get(keys.size() - 1);
final byte[] lastKey = keys.get(0);
final String firstKeyString = readUtf8(firstKey);
final String lastKeyString = readUtf8(lastKey);

// async scan
final CompletableFuture<List<KVEntry>> f1 = rheaKVStore.reverseScan(firstKey, lastKey);
final CompletableFuture<List<KVEntry>> f2 = rheaKVStore.reverseScan(firstKey, lastKey, false);
final CompletableFuture<List<KVEntry>> f3 = rheaKVStore.reverseScan(firstKeyString, lastKeyString);
final CompletableFuture<List<KVEntry>> f4 = rheaKVStore.reverseScan(firstKeyString, lastKeyString, false);
CompletableFuture.allOf(f1, f2, f3, f4).join();
for (final CompletableFuture<List<KVEntry>> f : new CompletableFuture[] { f1, f2, f3, f4 }) {
for (final KVEntry kv : f.join()) {
LOG.info("Async reverseScan: key={}, value={}", readUtf8(kv.getKey()), readUtf8(kv.getValue()));
}
}

// sync scan
final List<KVEntry> l1 = rheaKVStore.bReverseScan(firstKey, lastKey);
final List<KVEntry> l2 = rheaKVStore.bReverseScan(firstKey, lastKey, false);
final List<KVEntry> l3 = rheaKVStore.bReverseScan(firstKeyString, lastKeyString);
final List<KVEntry> l4 = rheaKVStore.bReverseScan(firstKeyString, lastKeyString, false);
for (final List<KVEntry> l : new List[] { l1, l2, l3, l4 }) {
for (final KVEntry kv : l) {
LOG.info("sync reverseScan: key={}, value={}", readUtf8(kv.getKey()), readUtf8(kv.getValue()));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public static void scan(final RheaKVStore rheaKVStore) {
final List<KVEntry> l4 = rheaKVStore.bScan(firstKeyString, lastKeyString, false);
for (final List<KVEntry> l : new List[] { l1, l2, l3, l4 }) {
for (final KVEntry kv : l) {
LOG.info("Async scan: key={}, value={}", readUtf8(kv.getKey()), readUtf8(kv.getValue()));
LOG.info("sync scan: key={}, value={}", readUtf8(kv.getKey()), readUtf8(kv.getValue()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,20 +455,26 @@ public void handleScanRequest(final ScanRequest request,
response.setRegionEpoch(getRegionEpoch());
try {
KVParameterRequires.requireSameEpoch(request, getRegionEpoch());
this.rawKVStore.scan(request.getStartKey(), request.getEndKey(), request.getLimit(),
request.isReadOnlySafe(), request.isReturnValue(), new BaseKVStoreClosure() {

@SuppressWarnings("unchecked")
@Override
public void run(final Status status) {
if (status.isOk()) {
response.setValue((List<KVEntry>) getData());
} else {
setFailure(request, response, status, getError());
}
closure.sendResponse(response);
BaseKVStoreClosure KVStoreClosure = new BaseKVStoreClosure() {

@SuppressWarnings("unchecked")
@Override
public void run(final Status status) {
if (status.isOk()) {
response.setValue((List<KVEntry>) getData());
} else {
setFailure(request, response, status, getError());
}
});
closure.sendResponse(response);
}
};
if (request.isReverse()) {
this.rawKVStore.reverseScan(request.getStartKey(), request.getEndKey(), request.getLimit(),
request.isReadOnlySafe(), request.isReturnValue(), KVStoreClosure);
} else {
this.rawKVStore.scan(request.getStartKey(), request.getEndKey(), request.getLimit(),
request.isReadOnlySafe(), request.isReturnValue(), KVStoreClosure);
}
} catch (final Throwable t) {
LOG.error("Failed to handle: {}, {}.", request, StackTraceUtil.stackTrace(t));
response.setError(Errors.forException(t));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.alipay.sofa.jraft.rhea.client;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -610,8 +611,7 @@ private FutureGroup<List<KVEntry>> internalScan(final byte[] startKey, final byt
final boolean readOnlySafe, final boolean returnValue,
final int retriesLeft, final Throwable lastCause) {
Requires.requireNonNull(startKey, "startKey");
final List<Region> regionList = this.pdClient
.findRegionsByKeyRange(startKey, endKey, ApiExceptionHelper.isInvalidEpoch(lastCause));
final List<Region> regionList = this.pdClient.findRegionsByKeyRange(startKey, endKey, ApiExceptionHelper.isInvalidEpoch(lastCause));
final List<CompletableFuture<List<KVEntry>>> futures = Lists.newArrayListWithCapacity(regionList.size());
final Errors lastError = lastCause == null ? null : Errors.forException(lastCause);
for (final Region region : regionList) {
Expand All @@ -623,31 +623,40 @@ private FutureGroup<List<KVEntry>> internalScan(final byte[] startKey, final byt
final ListRetryCallable<KVEntry> retryCallable = retryCause -> internalScan(subStartKey, subEndKey,
readOnlySafe, returnValue, retriesLeft - 1, retryCause);
final ListFailoverFuture<KVEntry> future = new ListFailoverFuture<>(retriesLeft, retryCallable);
internalRegionScan(region, subStartKey, subEndKey, readOnlySafe, returnValue, future, retriesLeft, lastError,
internalRegionScan(region, subStartKey, subEndKey, false, readOnlySafe, returnValue, future, retriesLeft, lastError,
this.onlyLeaderRead);
futures.add(future);
}
return new FutureGroup<>(futures);
}

private void internalRegionScan(final Region region, final byte[] subStartKey, final byte[] subEndKey,
final boolean readOnlySafe, final boolean returnValue,
final boolean reverse, final boolean readOnlySafe, final boolean returnValue,
final CompletableFuture<List<KVEntry>> future, final int retriesLeft,
final Errors lastCause, final boolean requireLeader) {
final RegionEngine regionEngine = getRegionEngine(region.getId(), requireLeader);
// require leader on retry
final RetryRunner retryRunner = retryCause -> internalRegionScan(region, subStartKey, subEndKey, readOnlySafe,
final RetryRunner retryRunner = retryCause -> internalRegionScan(region, subStartKey, subEndKey, reverse, readOnlySafe,
returnValue, future, retriesLeft - 1, retryCause, true);
final FailoverClosure<List<KVEntry>> closure = new FailoverClosureImpl<>(future, false,
retriesLeft, retryRunner);
if (regionEngine != null) {
if (ensureOnValidEpoch(region, regionEngine, closure)) {
final RawKVStore rawKVStore = getRawKVStore(regionEngine);
if (this.kvDispatcher == null) {
rawKVStore.scan(subStartKey, subEndKey, readOnlySafe, returnValue, closure);
if (reverse) {
if (this.kvDispatcher == null) {
rawKVStore.reverseScan(subStartKey, subEndKey, readOnlySafe, returnValue, closure);
} else {
this.kvDispatcher.execute(
() -> rawKVStore.reverseScan(subStartKey, subEndKey, readOnlySafe, returnValue, closure));
}
} else {
this.kvDispatcher.execute(
() -> rawKVStore.scan(subStartKey, subEndKey, readOnlySafe, returnValue, closure));
if (this.kvDispatcher == null) {
rawKVStore.scan(subStartKey, subEndKey, readOnlySafe, returnValue, closure);
} else {
this.kvDispatcher.execute(
() -> rawKVStore.scan(subStartKey, subEndKey, readOnlySafe, returnValue, closure));
}
}
}
} else {
Expand All @@ -658,10 +667,107 @@ private void internalRegionScan(final Region region, final byte[] subStartKey, f
request.setReturnValue(returnValue);
request.setRegionId(region.getId());
request.setRegionEpoch(region.getRegionEpoch());
request.setReverse(reverse);
this.rheaKVRpcService.callAsyncWithRpc(request, closure, lastCause, requireLeader);
}
}

@Override
public CompletableFuture<List<KVEntry>> reverseScan(final byte[] startKey, final byte[] endKey) {
return reverseScan(startKey, endKey, true);
}

@Override
public CompletableFuture<List<KVEntry>> reverseScan(final String startKey, final String endKey) {
return reverseScan(BytesUtil.writeUtf8(startKey), BytesUtil.writeUtf8(endKey));
}

@Override
public CompletableFuture<List<KVEntry>> reverseScan(final byte[] startKey, final byte[] endKey,
final boolean readOnlySafe) {
return reverseScan(startKey, endKey, readOnlySafe, true);
}

@Override
public CompletableFuture<List<KVEntry>> reverseScan(final String startKey, final String endKey,
final boolean readOnlySafe) {
return reverseScan(BytesUtil.writeUtf8(startKey), BytesUtil.writeUtf8(endKey), readOnlySafe);
}

@Override
public CompletableFuture<List<KVEntry>> reverseScan(final byte[] startKey, final byte[] endKey,
final boolean readOnlySafe, final boolean returnValue) {
checkState();
final byte[] realEndKey = BytesUtil.nullToEmpty(endKey);
if (startKey != null) {
Requires.requireTrue(BytesUtil.compare(startKey, realEndKey) > 0, "startKey must > endKey");
}
final FutureGroup<List<KVEntry>> futureGroup = internalReverseScan(startKey, realEndKey, readOnlySafe,
returnValue, this.failoverRetries, null);
return FutureHelper.joinList(futureGroup);
}

@Override
public CompletableFuture<List<KVEntry>> reverseScan(final String startKey, final String endKey,
final boolean readOnlySafe, final boolean returnValue) {
return reverseScan(BytesUtil.writeUtf8(startKey), BytesUtil.writeUtf8(endKey), readOnlySafe, returnValue);
}

@Override
public List<KVEntry> bReverseScan(final byte[] startKey, final byte[] endKey) {
return FutureHelper.get(reverseScan(startKey, endKey), this.futureTimeoutMillis);
}

@Override
public List<KVEntry> bReverseScan(final String startKey, final String endKey) {
return FutureHelper.get(reverseScan(startKey, endKey), this.futureTimeoutMillis);
}

@Override
public List<KVEntry> bReverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe) {
return FutureHelper.get(reverseScan(startKey, endKey, readOnlySafe), this.futureTimeoutMillis);
}

@Override
public List<KVEntry> bReverseScan(final String startKey, final String endKey, final boolean readOnlySafe) {
return FutureHelper.get(reverseScan(startKey, endKey, readOnlySafe), this.futureTimeoutMillis);
}

@Override
public List<KVEntry> bReverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe,
final boolean returnValue) {
return FutureHelper.get(reverseScan(startKey, endKey, readOnlySafe, returnValue), this.futureTimeoutMillis);
}

@Override
public List<KVEntry> bReverseScan(final String startKey, final String endKey, final boolean readOnlySafe,
final boolean returnValue) {
return FutureHelper.get(reverseScan(startKey, endKey, readOnlySafe, returnValue), this.futureTimeoutMillis);
}

private FutureGroup<List<KVEntry>> internalReverseScan(final byte[] startKey, final byte[] endKey,
final boolean readOnlySafe, final boolean returnValue,
final int retriesLeft, final Throwable lastCause) {
Requires.requireNonNull(endKey, "endKey");
final List<Region> regionList = this.pdClient.findRegionsByKeyRange(endKey, startKey, ApiExceptionHelper.isInvalidEpoch(lastCause));
Collections.reverse(regionList);
final List<CompletableFuture<List<KVEntry>>> futures = Lists.newArrayListWithCapacity(regionList.size());
final Errors lastError = lastCause == null ? null : Errors.forException(lastCause);
for (final Region region : regionList) {
final byte[] regionEndKey = region.getEndKey();
final byte[] regionStartKey = region.getStartKey();
final byte[] subStartKey = regionEndKey == null ? startKey : (startKey == null ? regionEndKey : BytesUtil.min(regionEndKey, startKey));
final byte[] subEndKey = regionStartKey == null ? endKey : BytesUtil.max(regionStartKey, endKey);
final ListRetryCallable<KVEntry> retryCallable = retryCause -> internalReverseScan(subStartKey, subEndKey,
readOnlySafe, returnValue, retriesLeft - 1, retryCause);
final ListFailoverFuture<KVEntry> future = new ListFailoverFuture<>(retriesLeft, retryCallable);
internalRegionScan(region, subStartKey, subEndKey, true, readOnlySafe, returnValue, future, retriesLeft, lastError,
this.onlyLeaderRead );
futures.add(future);
}
return new FutureGroup<>(futures);
}

public List<KVEntry> singleRegionScan(final byte[] startKey, final byte[] endKey, final int limit,
final boolean readOnlySafe, final boolean returnValue) {
checkState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,86 @@ List<KVEntry> bScan(final byte[] startKey, final byte[] endKey, final boolean re
List<KVEntry> bScan(final String startKey, final String endKey, final boolean readOnlySafe,
final boolean returnValue);

/**
* Equivalent to {@code reverseScan(startKey, endKey, true)}.
*/
CompletableFuture<List<KVEntry>> reverseScan(final byte[] startKey, final byte[] endKey);

/**
* @see #reverseScan(byte[], byte[])
*/
CompletableFuture<List<KVEntry>> reverseScan(final String startKey, final String endKey);

/**
* Equivalent to {@code reverseScan(startKey, endKey, readOnlySafe, true)}.
*/
CompletableFuture<List<KVEntry>> reverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe);

/**
* @see #reverseScan(byte[], byte[], boolean)
*/
CompletableFuture<List<KVEntry>> reverseScan(final String startKey, final String endKey, final boolean readOnlySafe);

/**
* Reverse query all data in the key of range [startKey, endKey).
* <p>
* Provide consistent reading if {@code readOnlySafe} is true.
*
* Reverse scanning is usually much worse than forward scanning.
*
* Reverse scanning across multi regions maybe slower and devastating.
*
* @param startKey first key to reverse scan within database (included),
* null means 'max-key' in the database.
* @param endKey last key to reverse scan within database (excluded).
* null means 'min-key' in the database.
* @param readOnlySafe provide consistent reading if {@code readOnlySafe}
* is true.
* @param returnValue whether to return value.
* @return a list where the key of range [startKey, endKey) passed by user
* and value for {@code KVEntry}
*/
CompletableFuture<List<KVEntry>> reverseScan(final byte[] startKey, final byte[] endKey,
final boolean readOnlySafe, final boolean returnValue);

/**
* @see #reverseScan(byte[], byte[], boolean, boolean)
*/
CompletableFuture<List<KVEntry>> reverseScan(final String startKey, final String endKey,
final boolean readOnlySafe, final boolean returnValue);

/**
* @see #reverseScan(byte[], byte[])
*/
List<KVEntry> bReverseScan(final byte[] startKey, final byte[] endKey);

/**
* @see #scan(String, String)
*/
List<KVEntry> bReverseScan(final String startKey, final String endKey);

/**
* @see #scan(String, String, boolean)
*/
List<KVEntry> bReverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe);

/**
* @see #scan(String, String, boolean)
*/
List<KVEntry> bReverseScan(final String startKey, final String endKey, final boolean readOnlySafe);

/**
* @see #reverseScan(String, String, boolean, boolean)
*/
List<KVEntry> bReverseScan(final byte[] startKey, final byte[] endKey, final boolean readOnlySafe,
final boolean returnValue);

/**
* @see #reverseScan(String, String, boolean, boolean)
*/
List<KVEntry> bReverseScan(final String startKey, final String endKey, final boolean readOnlySafe,
final boolean returnValue);

/**
* Equivalent to {@code iterator(startKey, endKey, bufSize, true)}.
*/
Expand Down
Loading

0 comments on commit 100d11d

Please sign in to comment.