diff --git a/scripts/format-all.sh b/scripts/format-all.sh index 995bc3f4..e3f9218a 100755 --- a/scripts/format-all.sh +++ b/scripts/format-all.sh @@ -11,6 +11,7 @@ SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java src/main/java/com/xiaomi/infra/pegasus/operator/*.java src/main/java/com/xiaomi/infra/pegasus/tools/*.java src/main/java/com/xiaomi/infra/pegasus/base/*.java + src/main/java/com/xiaomi/infra/pegasus/client/request/*.java src/main/java/com/xiaomi/infra/pegasus/example/*.java src/test/java/com/xiaomi/infra/pegasus/client/*.java src/test/java/com/xiaomi/infra/pegasus/metrics/*.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java b/src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java index c7a92ba9..bb21b95b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java @@ -6,29 +6,39 @@ import io.netty.util.concurrent.Future; import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang3.tuple.Pair; public class FutureGroup { + private boolean forceComplete; public FutureGroup(int initialCapacity) { - asyncTasks = new ArrayList<>(initialCapacity); + this(initialCapacity, true); + } + + public FutureGroup(int initialCapacity, boolean forceComplete) { + this.asyncTasks = new ArrayList<>(initialCapacity); + this.forceComplete = forceComplete; } public void add(Future task) { asyncTasks.add(task); } - public void waitAllCompleteOrOneFail(int timeoutMillis) throws PException { - waitAllCompleteOrOneFail(null, timeoutMillis); + public void waitAllComplete(int timeoutMillis) throws PException { + List> results = new ArrayList<>(); + waitAllComplete(results, timeoutMillis); } /** * Waits until all future tasks complete but terminate if one fails. * - * @param results is nullable, each element is the result of the Future. + * @param results . */ - public void waitAllCompleteOrOneFail(List results, int timeoutMillis) throws PException { + public int waitAllComplete(List> results, int timeoutMillis) + throws PException { int timeLimit = timeoutMillis; long duration = 0; + int count = 0; for (int i = 0; i < asyncTasks.size(); i++) { Future fu = asyncTasks.get(i); try { @@ -40,20 +50,26 @@ public void waitAllCompleteOrOneFail(List results, int timeoutMillis) th throw new PException("async task #[" + i + "] await failed: " + e.toString()); } - if (fu.isSuccess() && timeLimit >= 0) { - if (results != null) { - results.set(i, fu.getNow()); - } + if (timeLimit < 0) { + throw new PException( + String.format("async task #[" + i + "] failed: timeout expired (%dms)", timeoutMillis)); + } + + if (fu.isSuccess()) { + count++; + results.add(Pair.of(null, fu.getNow())); } else { Throwable cause = fu.cause(); - if (cause == null) { - throw new PException( - String.format( - "async task #[" + i + "] failed: timeout expired (%dms)", timeoutMillis)); + if (forceComplete) { + throw new PException("async task #[" + i + "] failed: " + cause.getMessage(), cause); } - throw new PException("async task #[" + i + "] failed: " + cause.getMessage(), cause); + results.add( + Pair.of( + new PException("async task #[" + i + "] failed: " + cause.getMessage(), cause), + null)); } } + return count; } private List> asyncTasks; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index 73c56b7e..c152e34c 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -3,6 +3,19 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; +import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetResult; +import com.xiaomi.infra.pegasus.client.request.BatchDelete; +import com.xiaomi.infra.pegasus.client.request.BatchGet; +import com.xiaomi.infra.pegasus.client.request.BatchSet; +import com.xiaomi.infra.pegasus.client.request.Delete; +import com.xiaomi.infra.pegasus.client.request.Get; +import com.xiaomi.infra.pegasus.client.request.Increment; +import com.xiaomi.infra.pegasus.client.request.MultiDelete; +import com.xiaomi.infra.pegasus.client.request.MultiGet; +import com.xiaomi.infra.pegasus.client.request.MultiSet; +import com.xiaomi.infra.pegasus.client.request.RangeDelete; +import com.xiaomi.infra.pegasus.client.request.RangeGet; +import com.xiaomi.infra.pegasus.client.request.Set; import com.xiaomi.infra.pegasus.rpc.*; import com.xiaomi.infra.pegasus.tools.Tools; import java.nio.ByteBuffer; @@ -199,6 +212,94 @@ public ClientOptions getConfiguration() { return clientOptions; } + @Override + public boolean exist(String tableName, Get get) throws PException { + PegasusTable tb = getTable(tableName); + return tb.exist(get, 0); + } + + @Override + public byte[] get(String tableName, Get get) throws PException { + PegasusTable tb = getTable(tableName); + return tb.get(get, 0); + } + + @Override + public void batchGet(String tableName, BatchGet batchGet, List> results) + throws PException { + PegasusTable tb = getTable(tableName); + tb.batchGet(batchGet, results, 0); + } + + @Override + public MultiGetResult multiGet(String tableName, MultiGet multiGet) throws PException { + PegasusTable tb = getTable(tableName); + return tb.multiGet(multiGet, 0); + } + + @Override + public MultiGetResult rangeGet(String tableName, RangeGet rangeGet) throws PException { + PegasusTable tb = getTable(tableName); + return tb.rangeGet(rangeGet, 0); + } + + @Override + public void set(String tableName, Set set) throws PException { + PegasusTable tb = getTable(tableName); + tb.set(set, 0); + } + + @Override + public void batchSet(String tableName, BatchSet batchSet, List> results) + throws PException { + PegasusTable tb = getTable(tableName); + tb.batchSet(batchSet, results, 0); + } + + @Override + public void multiSet(String tableName, MultiSet multiSet) throws PException { + PegasusTable tb = getTable(tableName); + tb.multiSet(multiSet, 0); + } + + @Override + public void del(String tableName, Delete delete) throws PException { + PegasusTable tb = getTable(tableName); + tb.del(delete, 0); + } + + @Override + public void batchDel( + String tableName, BatchDelete batchDelete, List> results) + throws PException { + PegasusTable tb = getTable(tableName); + tb.batchDel(batchDelete, results, 0); + } + + @Override + public void multiDel(String tableName, MultiDelete multiDelete) throws PException { + PegasusTable tb = getTable(tableName); + tb.multiDel(multiDelete, 0); + } + + @Override + public void rangeDel(String tableName, RangeDelete rangeDelete) throws PException { + PegasusTable tb = getTable(tableName); + tb.rangeDel(rangeDelete, 0); + } + + @Override + public int ttl(String tableName, Get get) throws PException { + PegasusTable tb = getTable(tableName); + return tb.ttl(get, 0); + } + + @Override + public long incr(String tableName, Increment increment) throws PException { + PegasusTable tb = getTable(tableName); + return tb.incr(increment, 0); + } + @Override public boolean exist(String tableName, byte[] hashKey, byte[] sortKey) throws PException { PegasusTable tb = getTable(tableName); diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index f28f9a26..949ff6eb 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -3,6 +3,19 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; +import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetResult; +import com.xiaomi.infra.pegasus.client.request.BatchDelete; +import com.xiaomi.infra.pegasus.client.request.BatchGet; +import com.xiaomi.infra.pegasus.client.request.BatchSet; +import com.xiaomi.infra.pegasus.client.request.Delete; +import com.xiaomi.infra.pegasus.client.request.Get; +import com.xiaomi.infra.pegasus.client.request.Increment; +import com.xiaomi.infra.pegasus.client.request.MultiDelete; +import com.xiaomi.infra.pegasus.client.request.MultiGet; +import com.xiaomi.infra.pegasus.client.request.MultiSet; +import com.xiaomi.infra.pegasus.client.request.RangeDelete; +import com.xiaomi.infra.pegasus.client.request.RangeGet; +import com.xiaomi.infra.pegasus.client.request.Set; import java.util.*; import org.apache.commons.lang3.tuple.Pair; @@ -64,6 +77,38 @@ public interface PegasusClientInterface { public PegasusTableInterface openTable(String tableName, int backupRequestDelayMs) throws PException; + public boolean exist(String tableName, Get get) throws PException; + + public byte[] get(String tableName, Get get) throws PException; + + public void batchGet(String tableName, BatchGet batchGet, List> results) + throws PException; + + public MultiGetResult multiGet(String tableName, MultiGet multiGet) throws PException; + + public MultiGetResult rangeGet(String tableName, RangeGet rangeGet) throws PException; + + public void set(String tableName, Set set) throws PException; + + public void batchSet(String tableName, BatchSet batchSet, List> results) + throws PException; + + public void multiSet(String tableName, MultiSet multiSet) throws PException; + + public void del(String tableName, Delete delete) throws PException; + + public void batchDel( + String tableName, BatchDelete batchDelete, List> results) + throws PException; + + public void multiDel(String tableName, MultiDelete multiDelete) throws PException; + + public void rangeDel(String tableName, RangeDelete rangeDelete) throws PException; + + public int ttl(String tableName, Get get) throws PException; + + public long incr(String tableName, Increment increment) throws PException; + /** * Check value exist by key from the cluster * @@ -73,6 +118,7 @@ public PegasusTableInterface openTable(String tableName, int backupRequestDelayM * @return true if exist, false if not exist * @throws PException throws exception if any error occurs. */ + @Deprecated public boolean exist(String tableName, byte[] hashKey, byte[] sortKey) throws PException; /** @@ -94,6 +140,7 @@ public PegasusTableInterface openTable(String tableName, int backupRequestDelayM * @return value; null if not found * @throws PException throws exception if any error occurs. */ + @Deprecated public byte[] get(String tableName, byte[] hashKey, byte[] sortKey) throws PException; /** @@ -108,6 +155,7 @@ public PegasusTableInterface openTable(String tableName, int backupRequestDelayM *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchGet(String tableName, List> keys, List values) throws PException; @@ -125,6 +173,7 @@ public void batchGet(String tableName, List> keys, ListNotice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchGet2( String tableName, List> keys, List> results) throws PException; @@ -146,6 +195,7 @@ public int batchGet2( * @return true if all data is fetched; false if only partial data is fetched. * @throws PException throws exception if any error occurs. */ + @Deprecated public boolean multiGet( String tableName, byte[] hashKey, @@ -155,6 +205,7 @@ public boolean multiGet( List> values) throws PException; + @Deprecated public boolean multiGet( String tableName, byte[] hashKey, List sortKeys, List> values) throws PException; @@ -175,6 +226,7 @@ public boolean multiGet( * @return true if all data is fetched; false if only partial data is fetched. * @throws PException throws exception if any error occurs. */ + @Deprecated public boolean multiGet( String tableName, byte[] hashKey, @@ -186,6 +238,7 @@ public boolean multiGet( List> values) throws PException; + @Deprecated public boolean multiGet( String tableName, byte[] hashKey, @@ -208,6 +261,7 @@ public boolean multiGet( *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchMultiGet( String tableName, List>> keys, List values) throws PException; @@ -228,6 +282,7 @@ public void batchMultiGet( *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchMultiGet2( String tableName, List>> keys, @@ -266,9 +321,11 @@ public boolean multiGetSortKeys(String tableName, byte[] hashKey, List s * @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0. * @throws PException throws exception if any error occurs. */ + @Deprecated public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds) throws PException; + @Deprecated public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value) throws PException; /** @@ -280,6 +337,7 @@ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value, *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchSet(String tableName, List items) throws PException; /** @@ -296,6 +354,7 @@ public void set(String tableName, byte[] hashKey, byte[] sortKey, byte[] value, *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchSet2(String tableName, List items, List results) throws PException; @@ -308,10 +367,12 @@ public int batchSet2(String tableName, List items, List res * @param ttlSeconds time to live in seconds, 0 means no ttl. default value is 0. * @throws PException throws exception if any error occurs. */ + @Deprecated public void multiSet( String tableName, byte[] hashKey, List> values, int ttlSeconds) throws PException; + @Deprecated public void multiSet(String tableName, byte[] hashKey, List> values) throws PException; @@ -326,9 +387,11 @@ public void multiSet(String tableName, byte[] hashKey, List *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchMultiSet(String tableName, List items, int ttlSeconds) throws PException; + @Deprecated public void batchMultiSet(String tableName, List items) throws PException; /** @@ -347,10 +410,12 @@ public void batchMultiSet(String tableName, List items, int ttlSeco *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchMultiSet2( String tableName, List items, int ttlSeconds, List results) throws PException; + @Deprecated public int batchMultiSet2(String tableName, List items, List results) throws PException; @@ -364,6 +429,7 @@ public int batchMultiSet2(String tableName, List items, List items, ListNotice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchDel(String tableName, List> keys) throws PException; /** @@ -392,6 +459,7 @@ public int batchMultiSet2(String tableName, List items, ListNotice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchDel2(String tableName, List> keys, List results) throws PException; @@ -403,6 +471,7 @@ public int batchDel2(String tableName, List> keys, List sortKeys) throws PException; /** @@ -416,6 +485,7 @@ public int batchDel2(String tableName, List> keys, ListNotice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchMultiDel(String tableName, List>> keys) throws PException; @@ -452,6 +523,7 @@ public void batchMultiDel(String tableName, List>> key *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchMultiDel2( String tableName, List>> keys, List results) throws PException; @@ -467,6 +539,7 @@ public int batchMultiDel2( * @return ttl time in seconds; -1 if no ttl set; -2 if not exist. * @throws PException throws exception if any error occurs. */ + @Deprecated public int ttl(String tableName, byte[] hashKey, byte[] sortKey) throws PException; /** @@ -478,16 +551,17 @@ public int batchMultiDel2( * @param increment the increment to be added to the old value. * @param ttlSeconds time to live in seconds for the new value. should be no less than -1. for the * second method, the ttlSeconds is 0. - if ttlSeconds == 0, the semantic is the same as - * redis: - normally, increment will preserve the original ttl. - if old data is expired by - * ttl, then set initial value to 0 and set no ttl. - if ttlSeconds > 0, then update with the - * new ttl if increment succeed. - if ttlSeconds == -1, then update to no ttl if increment - * succeed. + * redis: - normally, value will preserve the original ttl. - if old data is expired by ttl, + * then set initial value to 0 and set no ttl. - if ttlSeconds > 0, then update with the new + * ttl if value succeed. - if ttlSeconds == -1, then update to no ttl if value succeed. * @return the new value. * @throws PException throws exception if any error occurs. */ + @Deprecated public long incr(String tableName, byte[] hashKey, byte[] sortKey, long increment, int ttlSeconds) throws PException; + @Deprecated public long incr(String tableName, byte[] hashKey, byte[] sortKey, long increment) throws PException; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 15f64e43..f31f3511 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -7,6 +7,18 @@ import com.xiaomi.infra.pegasus.base.blob; import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.base.gpid; +import com.xiaomi.infra.pegasus.client.request.BatchDelete; +import com.xiaomi.infra.pegasus.client.request.BatchGet; +import com.xiaomi.infra.pegasus.client.request.BatchSet; +import com.xiaomi.infra.pegasus.client.request.Delete; +import com.xiaomi.infra.pegasus.client.request.Get; +import com.xiaomi.infra.pegasus.client.request.Increment; +import com.xiaomi.infra.pegasus.client.request.MultiDelete; +import com.xiaomi.infra.pegasus.client.request.MultiGet; +import com.xiaomi.infra.pegasus.client.request.MultiSet; +import com.xiaomi.infra.pegasus.client.request.RangeDelete; +import com.xiaomi.infra.pegasus.client.request.RangeGet; +import com.xiaomi.infra.pegasus.client.request.Set; import com.xiaomi.infra.pegasus.operator.*; import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.rpc.Table; @@ -43,6 +55,443 @@ public PegasusTable(PegasusClient client, Table table) { this.metaList = client.getMetaList(); } + @Override + public Future asyncExist(Get get, int timeout) { + final DefaultPromise promise = table.newPromise(); + asyncTTL(get, timeout) + .addListener( + new TTLListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + promise.setSuccess(future.get() != -2); + } else { + promise.setFailure(future.cause()); + } + } + }); + return promise; + } + + @Override + public Future asyncTTL(Get get, int timeout) { + final DefaultPromise promise = table.newPromise(); + blob request = new blob(PegasusClient.generateKey(get.hashKey, get.sortKey)); + + long partitionHash = table.getHash(request.data); + gpid pid = table.getGpidByHash(partitionHash); + rrdb_ttl_operator op = new rrdb_ttl_operator(pid, table.getTableName(), request, partitionHash); + + table.asyncOperate( + op, + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_ttl_operator op2 = (rrdb_ttl_operator) clientOP; + if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(get.hashKey, get.sortKey), promise, op, table, timeout); + } else if (op2.get_response().error != 0 && op2.get_response().error != 1) { + promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); + } else { + // On success: ttl time in seconds; -1 if no ttl set; -2 if not exist. + // If not exist, the error code of rpc response is kNotFound(1). + promise.setSuccess( + op2.get_response().error == 1 ? -2 : op2.get_response().ttl_seconds); + } + } + }, + timeout); + return promise; + } + + @Override + public Future asyncGet(Get get, int timeout) { + final DefaultPromise promise = table.newPromise(); + blob request = new blob(PegasusClient.generateKey(get.hashKey, get.sortKey)); + long partitionHash = table.getHash(request.data); + gpid gpid = table.getGpidByHash(partitionHash); + rrdb_get_operator op = + new rrdb_get_operator(gpid, table.getTableName(), request, partitionHash); + Table.ClientOPCallback callback = + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_get_operator gop = (rrdb_get_operator) clientOP; + if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(get.hashKey, get.sortKey), promise, op, table, timeout); + } else if (gop.get_response().error == 1) { // rocksdb::kNotFound + promise.setSuccess(null); + } else if (gop.get_response().error != 0) { + promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); + } else { + promise.setSuccess(gop.get_response().value.data); + } + } + }; + + table.asyncOperate(op, callback, timeout); + return promise; + } + + @Override + public Future asyncRangeGet(RangeGet rangeGet, int timeout) throws PException { + final DefaultPromise promise = table.newPromise(); + + blob hashKeyBlob = new blob(rangeGet.hashKey); + blob startSortKeyBlob = + (rangeGet.startSortKey == null ? null : new blob(rangeGet.startSortKey)); + blob stopSortKeyBlob = (rangeGet.stopSortKey == null ? null : new blob(rangeGet.stopSortKey)); + blob sortKeyFilterPatternBlob = + (rangeGet.externOptions.sortKeyFilterPattern == null + ? null + : new blob(rangeGet.externOptions.sortKeyFilterPattern)); + + multi_get_request request = + new multi_get_request( + hashKeyBlob, + null, + rangeGet.maxFetchCount, + rangeGet.maxFetchSize, + rangeGet.externOptions.noValue, + startSortKeyBlob, + stopSortKeyBlob, + rangeGet.externOptions.startInclusive, + rangeGet.externOptions.stopInclusive, + filter_type.findByValue(rangeGet.externOptions.sortKeyFilterType.getValue()), + sortKeyFilterPatternBlob, + rangeGet.externOptions.reverse); + long partitionHash = table.getKeyHash(request.hash_key.data); + gpid gpid = table.getGpidByHash(partitionHash); + rrdb_multi_get_operator op = + new rrdb_multi_get_operator(gpid, table.getTableName(), request, partitionHash); + + table.asyncOperate( + op, + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_multi_get_operator gop = (rrdb_multi_get_operator) clientOP; + if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(rangeGet.hashKey, rangeGet.maxFetchCount), + promise, + op, + table, + timeout); + } else if (gop.get_response().error != 0 && gop.get_response().error != 7) { + // rocksdb::Status::kOk && rocksdb::Status::kIncomplete + promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); + } else { + MultiGetResult result = new MultiGetResult(); + result.allFetched = (gop.get_response().error == 0); + result.values = new ArrayList>(gop.get_response().kvs.size()); + for (key_value kv : gop.get_response().kvs) { + result.values.add(new ImmutablePair(kv.key.data, kv.value.data)); + } + promise.setSuccess(result); + } + } + }, + timeout); + return promise; + } + + @Override + public Future asyncMultiGet(MultiGet multiGet, int timeout) { + final DefaultPromise promise = table.newPromise(); + + blob hashKeyBlob = new blob(multiGet.hashKey); + List sortKeyBlobs = new ArrayList(); + Map setKeyMap = null; + + if (multiGet.sortKeys != null && multiGet.sortKeys.size() > 0) { + setKeyMap = new TreeMap(); + for (int i = 0; i < multiGet.sortKeys.size(); i++) { + byte[] sortKey = multiGet.sortKeys.get(i); + if (sortKey == null) { + promise.setFailure( + new PException("Invalid parameter: sortKeys[" + i + "] should not be null")); + return promise; + } + setKeyMap.put(ByteBuffer.wrap(sortKey), sortKey); + } + for (Map.Entry entry : setKeyMap.entrySet()) { + sortKeyBlobs.add(new blob(entry.getValue())); + } + } + + multi_get_request request = + new multi_get_request( + hashKeyBlob, + sortKeyBlobs, + multiGet.maxFetchCount, + multiGet.maxFetchCount, + multiGet.noValue, + null, + null, + true, + false, + filter_type.FT_NO_FILTER, + null, + false); + long partitionHash = table.getKeyHash(request.hash_key.data); + gpid gpid = table.getGpidByHash(partitionHash); + rrdb_multi_get_operator op = + new rrdb_multi_get_operator(gpid, table.getTableName(), request, partitionHash); + final Map finalSetKeyMap = setKeyMap; + + table.asyncOperate( + op, + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_multi_get_operator gop = (rrdb_multi_get_operator) clientOP; + if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(multiGet.hashKey, sortKeyBlobs.size()), promise, op, table, timeout); + } else if (gop.get_response().error != 0 && gop.get_response().error != 7) { + // rocksdb::Status::kOk && rocksdb::Status::kIncomplete + promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); + } else { + MultiGetResult result = new MultiGetResult(); + result.allFetched = (gop.get_response().error == 0); + result.values = new ArrayList>(gop.get_response().kvs.size()); + if (finalSetKeyMap == null) { + for (key_value kv : gop.get_response().kvs) { + result.values.add(new ImmutablePair(kv.key.data, kv.value.data)); + } + } else { + for (key_value kv : gop.get_response().kvs) { + byte[] sortKey = finalSetKeyMap.get(ByteBuffer.wrap(kv.key.data)); + if (sortKey != null) { + result.values.add(new ImmutablePair(sortKey, kv.value.data)); + } + } + } + promise.setSuccess(result); + } + } + }, + timeout); + return promise; + } + + @Override + public Future asyncSet(Set set, int timeout) { + final DefaultPromise promise = table.newPromise(); + + try { + writeLimiter.validateSingleSet(set.hashKey, set.sortKey, set.value); + } catch (IllegalArgumentException e) { + handleWriteLimiterException(promise, e.getMessage()); + return promise; + } + + blob k = new blob(PegasusClient.generateKey(set.hashKey, set.sortKey)); + blob v = new blob(set.value); + int expireSeconds = (set.ttlSeconds == 0 ? 0 : set.ttlSeconds + (int) Tools.epoch_now()); + update_request req = new update_request(k, v, expireSeconds); + + long partitionHash = table.getHash(k.data); + gpid gpid = table.getGpidByHash(partitionHash); + rrdb_put_operator op = new rrdb_put_operator(gpid, table.getTableName(), req, partitionHash); + table.asyncOperate( + op, + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_put_operator gop = (rrdb_put_operator) clientOP; + if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(set.hashKey, set.sortKey), promise, op, table, timeout); + } else if (gop.get_response().error != 0) { + promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); + } else { + promise.setSuccess(null); + } + } + }, + timeout); + return promise; + } + + @Override + public Future asyncMultiSet(MultiSet multiSet, int timeout) { + final DefaultPromise promise = table.newPromise(); + if (multiSet.values == null || multiSet.values.size() == 0) { + promise.setFailure(new PException("Invalid parameter: values should not be null or empty")); + return promise; + } + + try { + writeLimiter.validateMultiSet(multiSet.hashKey, multiSet.values); + } catch (IllegalArgumentException e) { + handleWriteLimiterException(promise, e.getMessage()); + return promise; + } + + blob hash_key_blob = new blob(multiSet.hashKey); + List values_blob = new ArrayList(); + for (int i = 0; i < multiSet.values.size(); i++) { + byte[] k = multiSet.values.get(i).getKey(); + if (k == null) { + promise.setFailure( + new PException("Invalid parameter: values[" + i + "].key should not be null")); + return promise; + } + byte[] v = multiSet.values.get(i).getValue(); + if (v == null) { + promise.setFailure( + new PException("Invalid parameter: values[" + i + "].value should not be null")); + return promise; + } + values_blob.add(new key_value(new blob(k), new blob(v))); + } + int expireTsSseconds = + (multiSet.ttlSeconds == 0 ? 0 : multiSet.ttlSeconds + (int) Tools.epoch_now()); + multi_put_request request = new multi_put_request(hash_key_blob, values_blob, expireTsSseconds); + + long partitionHash = table.getKeyHash(multiSet.hashKey); + gpid gpid = table.getGpidByHash(partitionHash); + rrdb_multi_put_operator op = + new rrdb_multi_put_operator(gpid, table.getTableName(), request, partitionHash); + + table.asyncOperate( + op, + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_multi_put_operator op2 = (rrdb_multi_put_operator) clientOP; + if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(multiSet.hashKey, values_blob.size()), promise, op, table, timeout); + } else if (op2.get_response().error != 0) { + promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); + } else { + promise.setSuccess(null); + } + } + }, + timeout); + return promise; + } + + @Override + public Future asyncDel(Delete delete, int timeout) { + final DefaultPromise promise = table.newPromise(); + blob request = new blob(PegasusClient.generateKey(delete.hashKey, delete.sortKey)); + long partitionHash = table.getHash(request.data); + gpid gpid = table.getGpidByHash(partitionHash); + rrdb_remove_operator op = + new rrdb_remove_operator(gpid, table.getTableName(), request, partitionHash); + + table.asyncOperate( + op, + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_remove_operator op2 = (rrdb_remove_operator) clientOP; + if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(delete.hashKey, delete.sortKey), promise, op, table, timeout); + } else if (op2.get_response().error != 0) { + promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); + } else { + promise.setSuccess(null); + } + } + }, + timeout); + return promise; + } + + @Override + public Future asyncMultiDel(MultiDelete multiDelete, int timeout) { + final DefaultPromise promise = table.newPromise(); + if (multiDelete.sortKeys == null || multiDelete.sortKeys.isEmpty()) { + promise.setFailure(new PException("Invalid parameter: sortKeys size should be at lease 1")); + return promise; + } + + List sortKeyBlobs = new ArrayList(multiDelete.sortKeys.size()); + for (int i = 0; i < multiDelete.sortKeys.size(); i++) { + byte[] sortKey = multiDelete.sortKeys.get(i); + if (sortKey == null) { + promise.setFailure( + new PException("Invalid parameter: sortKeys[" + i + "] should not be null")); + return promise; + } + sortKeyBlobs.add(new blob(sortKey)); + } + multi_remove_request request = + new multi_remove_request(new blob(multiDelete.hashKey), sortKeyBlobs, 100); + + long partitionHash = table.getKeyHash(multiDelete.hashKey); + gpid pid = table.getGpidByHash(partitionHash); + rrdb_multi_remove_operator op = + new rrdb_multi_remove_operator(pid, table.getTableName(), request, partitionHash); + + table.asyncOperate( + op, + new Table.ClientOPCallback() { + public void onCompletion(client_operator clientOP) { + rrdb_multi_remove_operator op2 = (rrdb_multi_remove_operator) clientOP; + if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(multiDelete.hashKey, sortKeyBlobs.size()), + promise, + op, + table, + timeout); + } else if (op2.get_response().error != 0) { + promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); + } else { + Validate.isTrue(op2.get_response().count == multiDelete.sortKeys.size()); + promise.setSuccess(null); + } + } + }, + timeout); + return promise; + } + + @Override + public Future asyncIncr(Increment increment, int timeout) { + final DefaultPromise promise = table.newPromise(); + + blob key = new blob(PegasusClient.generateKey(increment.hashKey, increment.sortKey)); + int expireSeconds = + (increment.ttlSeconds <= 0 + ? increment.ttlSeconds + : increment.ttlSeconds + (int) Tools.epoch_now()); + incr_request request = new incr_request(key, increment.value, expireSeconds); + long partitionHash = table.getHash(request.key.data); + gpid gpid = table.getGpidByHash(partitionHash); + rrdb_incr_operator op = + new rrdb_incr_operator(gpid, table.getTableName(), request, partitionHash); + + table.asyncOperate( + op, + new Table.ClientOPCallback() { + @Override + public void onCompletion(client_operator clientOP) { + rrdb_incr_operator op2 = (rrdb_incr_operator) clientOP; + if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { + handleReplicaException( + new Request(increment.hashKey, increment.sortKey), promise, op, table, timeout); + } else if (op2.get_response().error != 0) { + promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); + } else { + promise.setSuccess(op2.get_response().new_value); + } + } + }, + timeout); + return promise; + } + @Override public Future asyncExist(byte[] hashKey, byte[] sortKey, int timeout) { final DefaultPromise promise = table.newPromise(); @@ -914,6 +1363,310 @@ public void onCompletion(client_operator clientOP) { return promise; } + @Override + public boolean exist(Get get, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + return asyncExist(get, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, table.getTableName(), new Request(get.hashKey, get.sortKey), timeout, e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public byte[] get(Get get, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + return asyncGet(get, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, table.getTableName(), new Request(get.hashKey, get.sortKey), timeout, e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public void batchGet(BatchGet batchGet, List> results, int timeout) + throws PException { + results.clear(); + FutureGroup futureGroup = + new FutureGroup<>(batchGet.getList.size(), batchGet.forceComplete); + for (Get get : batchGet.getList) { + futureGroup.add(asyncGet(get, timeout)); + } + futureGroup.waitAllComplete(results, timeout); + } + + @Override + public MultiGetResult multiGet(MultiGet multiGet, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + return asyncMultiGet(multiGet, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, + table.getTableName(), + new Request(multiGet.hashKey, multiGet.sortKeys.size()), + timeout, + e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public MultiGetResult rangeGet(RangeGet rangeGet, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + return asyncRangeGet(rangeGet, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, + table.getTableName(), + new Request(rangeGet.hashKey, rangeGet.maxFetchCount), + timeout, + e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public void set(Set set, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + asyncSet(set, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, table.getTableName(), new Request(set.hashKey, set.sortKey), timeout, e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public int batchSet(BatchSet batchSet, List> results, int timeout) + throws PException { + if (results == null) { + throw new PException("Invalid parameter: results should not be null"); + } + results.clear(); + FutureGroup futureGroup = + new FutureGroup<>(batchSet.setList.size(), batchSet.forceComplete); + + for (Set set : batchSet.setList) { + futureGroup.add(asyncSet(set, timeout)); + } + return futureGroup.waitAllComplete(results, timeout); + } + + @Override + public void multiSet(MultiSet multiSet, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + asyncMultiSet(multiSet, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, + table.getTableName(), + new Request(multiSet.hashKey, multiSet.values.size()), + timeout, + e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public void del(Delete delete, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + asyncDel(delete, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, table.getTableName(), new Request(delete.hashKey, delete.sortKey), timeout, e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public int batchDel(BatchDelete batchDelete, List> results, int timeout) + throws PException { + + FutureGroup futureGroup = + new FutureGroup<>(batchDelete.deleteList.size(), batchDelete.forceComplete); + + for (Delete delete : batchDelete.deleteList) { + futureGroup.add(asyncDel(delete, timeout)); + } + + return futureGroup.waitAllComplete(results, timeout); + } + + @Override + public void multiDel(MultiDelete multiDelete, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + asyncMultiDel(multiDelete, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, + table.getTableName(), + new Request(multiDelete.hashKey, multiDelete.sortKeys.size()), + timeout, + e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public void rangeDel(RangeDelete rangeDelete, int timeout) throws PException { + + if (timeout <= 0) timeout = defaultTimeout; + long startTime = System.currentTimeMillis(); + long lastCheckTime = startTime; + long deadlineTime = startTime + timeout; + int count = 0; + final int maxBatchDelCount = 100; + + ScanOptions scanOptions = new ScanOptions(); + scanOptions.noValue = true; + scanOptions.startInclusive = rangeDelete.options.startInclusive; + scanOptions.stopInclusive = rangeDelete.options.stopInclusive; + scanOptions.sortKeyFilterType = rangeDelete.options.sortKeyFilterType; + scanOptions.sortKeyFilterPattern = rangeDelete.options.sortKeyFilterPattern; + + rangeDelete.options.nextSortKey = rangeDelete.startSortKey; + PegasusScannerInterface pegasusScanner = + getScanner( + rangeDelete.hashKey, rangeDelete.startSortKey, rangeDelete.stopSortKey, scanOptions); + lastCheckTime = System.currentTimeMillis(); + if (lastCheckTime >= deadlineTime) { + throw new PException( + "Getting pegasusScanner takes too long time when delete hashKey:" + + new String(rangeDelete.hashKey) + + ",startSortKey:" + + new String(rangeDelete.startSortKey) + + ",stopSortKey:" + + new String(rangeDelete.stopSortKey) + + ",timeUsed:" + + (lastCheckTime - startTime) + + ":", + new ReplicationException(error_code.error_types.ERR_TIMEOUT)); + } + + int remainingTime = (int) (deadlineTime - lastCheckTime); + List sortKeys = new ArrayList(); + try { + Pair, byte[]> pairs; + while ((pairs = pegasusScanner.next()) != null) { + sortKeys.add(pairs.getKey().getValue()); + if (sortKeys.size() == maxBatchDelCount) { + rangeDelete.options.nextSortKey = sortKeys.get(0); + asyncMultiDel(rangeDelete.hashKey, sortKeys, remainingTime) + .get(remainingTime, TimeUnit.MILLISECONDS); + lastCheckTime = System.currentTimeMillis(); + remainingTime = (int) (deadlineTime - lastCheckTime); + if (remainingTime <= 0) { + throw new TimeoutException(); + } + count++; + sortKeys.clear(); + } + } + if (!sortKeys.isEmpty()) { + asyncMultiDel(new MultiDelete(rangeDelete.hashKey, sortKeys), remainingTime) + .get(remainingTime, TimeUnit.MILLISECONDS); + rangeDelete.options.nextSortKey = null; + } + } catch (InterruptedException | ExecutionException e) { + String nextSortKeyStr = + rangeDelete.options.nextSortKey == null + ? "" + : new String(rangeDelete.options.nextSortKey); + throw new PException( + "RangeDelete of hashKey:" + + new String(rangeDelete.hashKey) + + " from sortKey:" + + nextSortKeyStr + + "[index:" + + count * maxBatchDelCount + + "]" + + " failed:", + e); + } catch (TimeoutException e) { + String sortKey = sortKeys.isEmpty() ? null : new String(sortKeys.get(0)); + int timeUsed = (int) (System.currentTimeMillis() - startTime); + throw new PException( + "RangeDelete of hashKey:" + + new String(rangeDelete.hashKey) + + " from sortKey:" + + sortKey + + "[index:" + + count * maxBatchDelCount + + "]" + + " failed, timeUsed:" + + timeUsed, + new ReplicationException(error_code.error_types.ERR_TIMEOUT)); + } + } + + @Override + public int ttl(Get get, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + return asyncTTL(get, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, table.getTableName(), new Request(get.hashKey, get.sortKey), timeout, e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + + @Override + public long incr(Increment increment, int timeout) throws PException { + if (timeout <= 0) timeout = defaultTimeout; + try { + return asyncIncr(increment, timeout).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw PException.threadInterrupted(table.getTableName(), e); + } catch (TimeoutException e) { + throw PException.timeout( + metaList, + table.getTableName(), + new Request(increment.hashKey, increment.sortKey), + timeout, + e); + } catch (ExecutionException e) { + throw new PException(e); + } + } + @Override public boolean exist(byte[] hashKey, byte[] sortKey, int timeout) throws PException { if (timeout <= 0) timeout = defaultTimeout; @@ -1241,7 +1994,7 @@ public void batchSet(List items, int timeout) throws PException { for (SetItem i : items) { group.add(asyncSet(i.hashKey, i.sortKey, i.value, i.ttlSeconds, timeout)); } - group.waitAllCompleteOrOneFail(timeout); + group.waitAllComplete(timeout); } @Override diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java index bc51ea7d..17af6a95 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTableInterface.java @@ -3,6 +3,18 @@ // can be found in the LICENSE file in the root directory of this source tree. package com.xiaomi.infra.pegasus.client; +import com.xiaomi.infra.pegasus.client.request.BatchDelete; +import com.xiaomi.infra.pegasus.client.request.BatchGet; +import com.xiaomi.infra.pegasus.client.request.BatchSet; +import com.xiaomi.infra.pegasus.client.request.Delete; +import com.xiaomi.infra.pegasus.client.request.Get; +import com.xiaomi.infra.pegasus.client.request.Increment; +import com.xiaomi.infra.pegasus.client.request.MultiDelete; +import com.xiaomi.infra.pegasus.client.request.MultiGet; +import com.xiaomi.infra.pegasus.client.request.MultiSet; +import com.xiaomi.infra.pegasus.client.request.RangeDelete; +import com.xiaomi.infra.pegasus.client.request.RangeGet; +import com.xiaomi.infra.pegasus.client.request.Set; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import java.util.List; @@ -71,6 +83,27 @@ public static interface ExistListener extends GenericFutureListener future) throws Exception; } + public Future asyncExist(Get get, int timeout /*ms*/); + + public Future asyncTTL(Get get, int timeout /*ms*/); + + public Future asyncGet(Get get, int timeout /*ms*/); + + public Future asyncRangeGet(RangeGet rangeGet, int timeout /*ms*/) + throws PException; + + public Future asyncMultiGet(MultiGet multiGet, int timeout /*ms*/); + + public Future asyncMultiSet(MultiSet multiSet, int timeout /*ms*/); + + public Future asyncSet(Set set, int timeout /*ms*/); + + public Future asyncDel(Delete delete, int timeout /*ms*/); + + public Future asyncMultiDel(MultiDelete multiDelete, int timeout /*ms*/); + + public Future asyncIncr(Increment increment, int timeout /*ms*/); + /** * Check value existence for a specific (hashKey, sortKey) pair of current table, async version * @@ -89,6 +122,7 @@ public static interface ExistListener extends GenericFutureListener asyncExist(byte[] hashKey, byte[] sortKey, int timeout /*ms*/); /// < -------- SortKeyCount -------- @@ -120,6 +154,7 @@ public static interface SortKeyCountListener extends GenericFutureListener asyncSortKeyCount(byte[] hashKey, int timeout /*ms*/); /// < -------- Get -------- @@ -154,6 +189,7 @@ public static interface GetListener extends GenericFutureListener * are guaranteed to be executed as the same order as the listeners added. But listeners for * different tables are not guaranteed to be dispatched in the same thread. */ + @Deprecated public Future asyncGet(byte[] hashKey, byte[] sortKey, int timeout /*ms*/); /// < -------- MultiGet -------- @@ -206,6 +242,7 @@ public static interface MultiGetListener extends GenericFutureListener asyncMultiGet( byte[] hashKey, List sortKeys, @@ -213,6 +250,7 @@ public Future asyncMultiGet( int maxFetchSize, int timeout /*ms*/); + @Deprecated public Future asyncMultiGet( byte[] hashKey, List sortKeys, int timeout /*ms*/); @@ -238,6 +276,7 @@ public Future asyncMultiGet( * the same order as the listeners added. But listeners for different tables are not * guaranteed to be dispatched in the same thread. */ + @Deprecated public Future asyncMultiGet( byte[] hashKey, byte[] startSortKey, @@ -247,6 +286,7 @@ public Future asyncMultiGet( int maxFetchSize, int timeout /*ms*/); + @Deprecated public Future asyncMultiGet( byte[] hashKey, byte[] startSortKey, @@ -301,9 +341,11 @@ public static interface MultiGetSortKeysListener * the same order as the listeners added. But listeners for different tables are not * guaranteed to be dispatched in the same thread. */ + @Deprecated public Future asyncMultiGetSortKeys( byte[] hashKey, int maxFetchCount, int maxFetchSize, int timeout /*ms*/); + @Deprecated public Future asyncMultiGetSortKeys(byte[] hashKey, int timeout /*ms*/); /// < -------- Set -------- @@ -340,9 +382,11 @@ public static interface SetListener extends GenericFutureListener> * are guaranteed to be executed as the same order as the listeners added. But listeners for * different tables are not guaranteed to be dispatched in the same thread. */ + @Deprecated public Future asyncSet( byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds, int timeout /*ms*/); + @Deprecated public Future asyncSet(byte[] hashKey, byte[] sortKey, byte[] value, int timeout /*ms*/); /// < -------- MultiGet -------- @@ -377,9 +421,11 @@ public static interface MultiSetListener extends GenericFutureListener asyncMultiSet( byte[] hashKey, List> values, int ttlSeconds, int timeout /*ms*/); + @Deprecated public Future asyncMultiSet( byte[] hashKey, List> values, int timeout /*ms*/); @@ -415,6 +461,7 @@ public static interface DelListener extends GenericFutureListener> * the same order as the listeners added. But listeners for different tables are not * guaranteed to be dispatched in the same thread. */ + @Deprecated public Future asyncDel(byte[] hashKey, byte[] sortKey, int timeout /*ms*/); /// < -------- MultiDel -------- @@ -448,6 +495,7 @@ public static interface MultiDelListener extends GenericFutureListener asyncMultiDel(byte[] hashKey, List sortKeys, int timeout /*ms*/); /// < -------- Incr -------- @@ -487,9 +535,11 @@ public static interface IncrListener extends GenericFutureListener> * the same order as the listeners added. But listeners for different tables are not * guaranteed to be dispatched in the same thread. */ + @Deprecated public Future asyncIncr( byte[] hashKey, byte[] sortKey, long increment, int ttlSeconds, int timeout /*ms*/); + @Deprecated public Future asyncIncr(byte[] hashKey, byte[] sortKey, long increment, int timeout /*ms*/); /// < -------- CheckAndSet -------- @@ -705,14 +755,47 @@ public static interface TTLListener extends GenericFutureListener asyncTTL(byte[] hashKey, byte[] sortKey, int timeout /*ms*/); /// < -------- Sync Methods -------- + public boolean exist(Get get, int timeout) throws PException; + + public byte[] get(Get get, int timeout) throws PException; + + public void batchGet(BatchGet batchGet, List> results, int timeout) + throws PException; + + public MultiGetResult multiGet(MultiGet multiGet, int timeout) throws PException; + + public MultiGetResult rangeGet(RangeGet rangeGet, int timeout) throws PException; + + public void set(Set set, int timeout) throws PException; + + public int batchSet(BatchSet batchSet, List> results, int timeout) + throws PException; + + public void multiSet(MultiSet multiSet, int timeout) throws PException; + + public void del(Delete delete, int timeout) throws PException; + + public int batchDel(BatchDelete batchDelete, List> results, int timeout) + throws PException; + + public void multiDel(MultiDelete multiDelete, int timeout) throws PException; + + public void rangeDel(RangeDelete rangeDelete, int timeout) throws PException; + + public int ttl(Get get, int timeout) throws PException; + + public long incr(Increment increment, int timeout) throws PException; + /** * sync version of Exist, please refer to the async version {@link #asyncExist(byte[], byte[], * int)} */ + @Deprecated public boolean exist(byte[] hashKey, byte[] sortKey, int timeout /*ms*/) throws PException; /** @@ -724,6 +807,7 @@ public static interface TTLListener extends GenericFutureListenerNotice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchGet(List> keys, List values, int timeout /*ms*/) throws PException; @@ -759,6 +844,7 @@ public void batchGet(List> keys, List values, int t *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchGet2( List> keys, List> results, int timeout /*ms*/) throws PException; @@ -767,6 +853,7 @@ public int batchGet2( * sync version of MultiGet, please refer to the async version {@link #asyncMultiGet(byte[], List, * int, int, int)} and {@link #asyncMultiGet(byte[], List, int)} */ + @Deprecated public MultiGetResult multiGet( byte[] hashKey, List sortKeys, @@ -775,6 +862,7 @@ public MultiGetResult multiGet( int timeout /*ms*/) throws PException; + @Deprecated public MultiGetResult multiGet(byte[] hashKey, List sortKeys, int timeout /*ms*/) throws PException; @@ -783,6 +871,7 @@ public MultiGetResult multiGet(byte[] hashKey, List sortKeys, int timeou * byte[], byte[], MultiGetOptions, int, int, int)} and {@link #asyncMultiGet(byte[], byte[], * byte[], MultiGetOptions, int)} */ + @Deprecated public MultiGetResult multiGet( byte[] hashKey, byte[] startSortKey, @@ -793,6 +882,7 @@ public MultiGetResult multiGet( int timeout /*ms*/) throws PException; + @Deprecated public MultiGetResult multiGet( byte[] hashKey, byte[] startSortKey, @@ -815,6 +905,7 @@ public MultiGetResult multiGet( *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchMultiGet( List>> keys, List values, int timeout /*ms*/) throws PException; @@ -834,6 +925,7 @@ public void batchMultiGet( *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchMultiGet2( List>> keys, List> results, @@ -844,9 +936,11 @@ public int batchMultiGet2( * sync version of MultiGetSortKeys, please refer to the async version {@link * #asyncMultiGetSortKeys(byte[], int, int, int)} and {@link #asyncMultiGetSortKeys(byte[], int)} */ + @Deprecated public MultiGetSortKeysResult multiGetSortKeys( byte[] hashKey, int maxFetchCount, int maxFetchSize, int timeout /*ms*/) throws PException; + @Deprecated public MultiGetSortKeysResult multiGetSortKeys(byte[] hashKey, int timeout /*ms*/) throws PException; @@ -854,9 +948,11 @@ public MultiGetSortKeysResult multiGetSortKeys(byte[] hashKey, int timeout /*ms* * sync version of Set, please refer to the async version {@link #asyncSet(byte[], byte[], byte[], * int, int)} and {@link #asyncSet(byte[], byte[], byte[], int)} */ + @Deprecated public void set(byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds, int timeout /*ms*/) throws PException; + @Deprecated public void set(byte[] hashKey, byte[] sortKey, byte[] value, int timeout /*ms*/) throws PException; @@ -871,6 +967,7 @@ public void set(byte[] hashKey, byte[] sortKey, byte[] value, int timeout /*ms*/ *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchSet(List items, int timeout /*ms*/) throws PException; /** @@ -889,6 +986,7 @@ public void set(byte[] hashKey, byte[] sortKey, byte[] value, int timeout /*ms*/ *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchSet2(List items, List results, int timeout /*ms*/) throws PException; @@ -896,10 +994,12 @@ public int batchSet2(List items, List results, int timeout * sync version of MultiSet, please refer to the async version {@link #asyncMultiSet(byte[], List, * int, int)} and {@link #asyncMultiSet(byte[], List, int)} */ + @Deprecated public void multiSet( byte[] hashKey, List> values, int ttlSeconds, int timeout /*ms*/) throws PException; + @Deprecated public void multiSet(byte[] hashKey, List> values, int timeout /*ms*/) throws PException; @@ -916,6 +1016,7 @@ public void multiSet(byte[] hashKey, List> values, int time *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchMultiSet(List items, int ttlSeconds, int timeout /*ms*/) throws PException; @@ -937,6 +1038,7 @@ public void batchMultiSet(List items, int ttlSeconds, int timeout / *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchMultiSet2( List items, int ttlSeconds, List results, int timeout /*ms*/) throws PException; @@ -944,6 +1046,7 @@ public int batchMultiSet2( /** * sync version of Del, please refer to the async version {@link #asyncDel(byte[], byte[], int)} */ + @Deprecated public void del(byte[] hashKey, byte[] sortKey, int timeout /*ms*/) throws PException; /** @@ -957,6 +1060,7 @@ public int batchMultiSet2( *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchDel(List> keys, int timeout /*ms*/) throws PException; /** @@ -976,6 +1080,7 @@ public int batchMultiSet2( *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchDel2( List> keys, List results, int timeout /*ms*/) throws PException; @@ -984,6 +1089,7 @@ public int batchDel2( * sync version of MultiDel, please refer to the async version {@link #asyncMultiDel(byte[], List, * int)} */ + @Deprecated public void multiDel(byte[] hashKey, List sortKeys, int timeout /*ms*/) throws PException; /** @@ -999,6 +1105,7 @@ public int batchDel2( * used. * @throws PException throws exception if any error occurs. */ + @Deprecated public void delRange( byte[] hashKey, byte[] startSortKey, @@ -1019,6 +1126,7 @@ public void delRange( *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public void batchMultiDel(List>> keys, int timeout /*ms*/) throws PException; @@ -1039,6 +1147,7 @@ public void batchMultiDel(List>> keys, int timeout /*m *

Notice: the method is not atomic, that means, maybe some keys succeed but some keys * failed. */ + @Deprecated public int batchMultiDel2( List>> keys, List results, int timeout /*ms*/) throws PException; @@ -1047,6 +1156,7 @@ public int batchMultiDel2( * sync version of Incr, please refer to the async version {@link #asyncIncr(byte[], byte[], long, * int, int)} */ + @Deprecated public long incr( byte[] hashKey, byte[] sortKey, long increment, int ttlSeconds, int timeout /*ms*/) throws PException; @@ -1055,6 +1165,7 @@ public long incr( * sync version of Incr, please refer to the async version {@link #asyncIncr(byte[], byte[], long, * int)} */ + @Deprecated public long incr(byte[] hashKey, byte[] sortKey, long increment, int timeout /*ms*/) throws PException; diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchDelete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchDelete.java new file mode 100644 index 00000000..379aae5f --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchDelete.java @@ -0,0 +1,24 @@ +package com.xiaomi.infra.pegasus.client.request; + +import java.util.ArrayList; +import java.util.List; + +public class BatchDelete { + + public boolean forceComplete; + + public List deleteList = new ArrayList<>(); + + public BatchDelete(boolean forceComplete) { + this.forceComplete = forceComplete; + } + + public BatchDelete(boolean forceComplete, List deleteList) { + this.forceComplete = forceComplete; + this.deleteList = deleteList; + } + + public void add(Delete delete) { + deleteList.add(delete); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchGet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchGet.java new file mode 100644 index 00000000..56e5e236 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchGet.java @@ -0,0 +1,23 @@ +package com.xiaomi.infra.pegasus.client.request; + +import java.util.ArrayList; +import java.util.List; + +public class BatchGet { + public boolean forceComplete; + + public List getList = new ArrayList<>(); + + public BatchGet(boolean forceComplete) { + this.forceComplete = forceComplete; + } + + public BatchGet(boolean forceComplete, List getList) { + this.forceComplete = forceComplete; + this.getList = getList; + } + + public void add(Get get) { + getList.add(get); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchSet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchSet.java new file mode 100644 index 00000000..d6ad6868 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/BatchSet.java @@ -0,0 +1,22 @@ +package com.xiaomi.infra.pegasus.client.request; + +import java.util.ArrayList; +import java.util.List; + +public class BatchSet { + public boolean forceComplete; + public List setList = new ArrayList<>(); + + public BatchSet(boolean forceComplete) { + this.forceComplete = forceComplete; + } + + public BatchSet(boolean forceComplete, List setList) { + this.forceComplete = forceComplete; + this.setList = setList; + } + + public void add(Set set) { + setList.add(set); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java new file mode 100644 index 00000000..07a83a9d --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Delete.java @@ -0,0 +1,12 @@ +package com.xiaomi.infra.pegasus.client.request; + +public class Delete extends Key { + + public Delete(byte[] hashKey) { + super(hashKey); + } + + public Delete(byte[] hashKey, byte[] sortKey) { + super(hashKey, sortKey); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java new file mode 100644 index 00000000..92579139 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Get.java @@ -0,0 +1,12 @@ +package com.xiaomi.infra.pegasus.client.request; + +public class Get extends Key { + + public Get(byte[] hashKey) { + super(hashKey); + } + + public Get(byte[] hashKey, byte[] sortKey) { + super(hashKey, sortKey); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Increment.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Increment.java new file mode 100644 index 00000000..3743092d --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Increment.java @@ -0,0 +1,17 @@ +package com.xiaomi.infra.pegasus.client.request; + +public class Increment extends Key { + public int value; + public int ttlSeconds; + + public Increment(byte[] hashKey, byte[] sortKey) { + this(hashKey, sortKey, 1, 0); + } + + public Increment(byte[] hashKey, byte[] sortKey, int value, int ttlSeconds) { + super(hashKey, sortKey); + assert (ttlSeconds > -1); + this.ttlSeconds = ttlSeconds; + this.value = value; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Key.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Key.java new file mode 100644 index 00000000..fc9ef46f --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Key.java @@ -0,0 +1,15 @@ +package com.xiaomi.infra.pegasus.client.request; + +class Key { + public byte[] hashKey = null; + public byte[] sortKey = null; + + Key(byte[] hashKey) { + this.hashKey = hashKey; + } + + Key(byte[] hashKey, byte[] sortKey) { + this.hashKey = hashKey; + this.sortKey = sortKey; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java new file mode 100644 index 00000000..c529bf66 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiDelete.java @@ -0,0 +1,23 @@ +package com.xiaomi.infra.pegasus.client.request; + +import java.util.ArrayList; +import java.util.List; + +public class MultiDelete { + public byte[] hashKey; + public List sortKeys = new ArrayList<>(); + + public MultiDelete(byte[] hashKey) { + assert (hashKey != null && hashKey.length != 0 && hashKey.length < 0xFFFF); + this.hashKey = hashKey; + } + + public MultiDelete(byte[] hashKey, List sortKeys) { + this.hashKey = hashKey; + this.sortKeys = sortKeys; + } + + public void add(byte[] sortKey) { + sortKeys.add(sortKey); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java new file mode 100644 index 00000000..52b94f28 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiGet.java @@ -0,0 +1,31 @@ +package com.xiaomi.infra.pegasus.client.request; + +import java.util.ArrayList; +import java.util.List; + +public class MultiGet { + public byte[] hashKey; + public List sortKeys; + + public int maxFetchCount; + public int maxFetchSize; + public boolean noValue; + + public MultiGet(byte[] hashKey) { + this(hashKey, new ArrayList<>(), 100, 1000, false); + } + + public MultiGet( + byte[] hashKey, List sortKeys, int maxFetchCount, int maxFetchSize, boolean noValue) { + assert (hashKey != null && hashKey.length != 0 && hashKey.length < 0xFFFF && sortKeys != null); + this.hashKey = hashKey; + this.sortKeys = sortKeys; + this.maxFetchCount = maxFetchCount; + this.maxFetchSize = maxFetchSize; + this.noValue = noValue; + } + + public void add(byte[] sortKey) { + sortKeys.add(sortKey); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java new file mode 100644 index 00000000..ea5c94b1 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/MultiSet.java @@ -0,0 +1,27 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +package com.xiaomi.infra.pegasus.client.request; + +import java.util.*; +import org.apache.commons.lang3.tuple.Pair; + +public class MultiSet { + public byte[] hashKey; + public List> values = new ArrayList<>(); + public int ttlSeconds; + + public MultiSet(byte[] hashKey) { + this(hashKey, 0); + } + + public MultiSet(byte[] hashKey, int ttlSeconds) { + assert (hashKey != null && hashKey.length > 0 && hashKey.length < 0xFFFF && ttlSeconds > 0); + this.hashKey = hashKey; + this.ttlSeconds = ttlSeconds; + } + + public void add(byte[] sortKey, byte[] value) { + values.add(Pair.of(sortKey, value)); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/RangeDelete.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/RangeDelete.java new file mode 100644 index 00000000..9acd044b --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/RangeDelete.java @@ -0,0 +1,23 @@ +package com.xiaomi.infra.pegasus.client.request; + +import com.xiaomi.infra.pegasus.client.DelRangeOptions; + +public class RangeDelete { + + public byte[] hashKey; + public byte[] startSortKey; + public byte[] stopSortKey; + + public DelRangeOptions options = new DelRangeOptions(); + + public RangeDelete(byte[] hashKey) { + this(hashKey, "".getBytes(), "".getBytes()); + } + + public RangeDelete(byte[] hashKey, byte[] startSortKey, byte[] stopSortKey) { + assert (hashKey != null && hashKey.length > 0); + this.hashKey = hashKey; + this.startSortKey = startSortKey; + this.stopSortKey = stopSortKey; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/RangeGet.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/RangeGet.java new file mode 100644 index 00000000..7f5e6c7a --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/RangeGet.java @@ -0,0 +1,31 @@ +package com.xiaomi.infra.pegasus.client.request; + +import com.xiaomi.infra.pegasus.client.MultiGetOptions; + +public class RangeGet { + public byte[] hashKey; + public byte[] startSortKey; + public byte[] stopSortKey; + public int maxFetchCount; + public int maxFetchSize; + + public MultiGetOptions externOptions = new MultiGetOptions(); + + public RangeGet(byte[] hashKey, byte[] startSortKey, byte[] stopSortKey) { + this(hashKey, startSortKey, stopSortKey, 100, 1000); + } + + public RangeGet( + byte[] hashKey, + byte[] startSortKey, + byte[] stopSortKey, + int maxFetchCount, + int maxFetchSize) { + assert (hashKey != null && hashKey.length != 0 && hashKey.length < 0xFFFF); + this.hashKey = hashKey; + this.startSortKey = startSortKey; + this.stopSortKey = stopSortKey; + this.maxFetchCount = maxFetchCount; + this.maxFetchSize = maxFetchSize; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java b/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java new file mode 100644 index 00000000..7a8265e3 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/client/request/Set.java @@ -0,0 +1,25 @@ +// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. +package com.xiaomi.infra.pegasus.client.request; + +import java.io.Serializable; + +public class Set implements Serializable { + public byte[] hashKey; + public byte[] sortKey; + public byte[] value; + public int ttlSeconds; // 0 means no ttl + + public Set(byte[] hashKey, byte[] sortKey, byte[] value) { + this(hashKey, sortKey, value, 0); + } + + public Set(byte[] hashKey, byte[] sortKey, byte[] value, int ttlSeconds) { + assert (value != null && ttlSeconds >= 0); + this.hashKey = hashKey; + this.sortKey = sortKey; + this.value = value; + this.ttlSeconds = ttlSeconds; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 3c9605c9..998e7785 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -158,7 +158,7 @@ void initTableConfiguration(query_cfg_response resp) { // Warm up the connections during client.openTable, so RPCs thereafter can // skip the connect process. try { - futureGroup.waitAllCompleteOrOneFail(manager_.getTimeout()); + futureGroup.waitAllComplete(manager_.getTimeout()); } catch (PException e) { logger.warn("failed to connect with some replica servers: ", e); } diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestFutureGroup.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestFutureGroup.java index 349ade4d..6605c4d5 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestFutureGroup.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestFutureGroup.java @@ -47,7 +47,7 @@ public void testBlockingOperationException() throws Exception { FutureGroup group = new FutureGroup<>(1); group.add(promise); try { - group.waitAllCompleteOrOneFail(10000); + group.waitAllComplete(10000); } catch (PException e) { success.set(false); System.err.println(name.getMethodName() + ": " + e.toString()); @@ -78,7 +78,7 @@ public void testFutureWaitTimeout() throws Exception { group.add(promise); try { // never wake up promise. - group.waitAllCompleteOrOneFail(10); + group.waitAllComplete(10); } catch (PException e) { // must throw exception System.err.println(name.getMethodName() + ": " + e.toString());