Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

refactor: simplify api using encapsulating parameters and results #124

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scripts/format-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java
src/main/java/com/xiaomi/infra/pegasus/operator/*.java
src/main/java/com/xiaomi/infra/pegasus/tools/*.java
src/main/java/com/xiaomi/infra/pegasus/base/*.java
src/main/java/com/xiaomi/infra/pegasus/client/request/*.java
src/main/java/com/xiaomi/infra/pegasus/example/*.java
src/test/java/com/xiaomi/infra/pegasus/client/*.java
src/test/java/com/xiaomi/infra/pegasus/metrics/*.java
Expand Down
44 changes: 30 additions & 14 deletions src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,39 @@
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;

public class FutureGroup<Result> {
private boolean forceComplete;

public FutureGroup(int initialCapacity) {
asyncTasks = new ArrayList<>(initialCapacity);
this(initialCapacity, true);
}

public FutureGroup(int initialCapacity, boolean forceComplete) {
this.asyncTasks = new ArrayList<>(initialCapacity);
this.forceComplete = forceComplete;
}

public void add(Future<Result> task) {
asyncTasks.add(task);
}

public void waitAllCompleteOrOneFail(int timeoutMillis) throws PException {
waitAllCompleteOrOneFail(null, timeoutMillis);
public void waitAllComplete(int timeoutMillis) throws PException {
List<Pair<PException, Result>> results = new ArrayList<>();
waitAllComplete(results, timeoutMillis);
}

/**
* Waits until all future tasks complete but terminate if one fails.
*
* @param results is nullable, each element is the result of the Future.
* @param results .
*/
public void waitAllCompleteOrOneFail(List<Result> results, int timeoutMillis) throws PException {
public int waitAllComplete(List<Pair<PException, Result>> results, int timeoutMillis)
throws PException {
int timeLimit = timeoutMillis;
long duration = 0;
int count = 0;
for (int i = 0; i < asyncTasks.size(); i++) {
Future<Result> fu = asyncTasks.get(i);
try {
Expand All @@ -40,20 +50,26 @@ public void waitAllCompleteOrOneFail(List<Result> results, int timeoutMillis) th
throw new PException("async task #[" + i + "] await failed: " + e.toString());
}

if (fu.isSuccess() && timeLimit >= 0) {
if (results != null) {
results.set(i, fu.getNow());
}
if (timeLimit < 0) {
throw new PException(
String.format("async task #[" + i + "] failed: timeout expired (%dms)", timeoutMillis));
}

if (fu.isSuccess()) {
count++;
results.add(Pair.of(null, fu.getNow()));
} else {
Throwable cause = fu.cause();
if (cause == null) {
throw new PException(
String.format(
"async task #[" + i + "] failed: timeout expired (%dms)", timeoutMillis));
if (forceComplete) {
throw new PException("async task #[" + i + "] failed: " + cause.getMessage(), cause);
}
throw new PException("async task #[" + i + "] failed: " + cause.getMessage(), cause);
results.add(
Pair.of(
new PException("async task #[" + i + "] failed: " + cause.getMessage(), cause),
null));
}
}
return count;
}

private List<Future<Result>> asyncTasks;
Expand Down
101 changes: 101 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.client;

import com.xiaomi.infra.pegasus.client.PegasusTableInterface.MultiGetResult;
import com.xiaomi.infra.pegasus.client.request.BatchDelete;
import com.xiaomi.infra.pegasus.client.request.BatchGet;
import com.xiaomi.infra.pegasus.client.request.BatchSet;
import com.xiaomi.infra.pegasus.client.request.Delete;
import com.xiaomi.infra.pegasus.client.request.Get;
import com.xiaomi.infra.pegasus.client.request.Increment;
import com.xiaomi.infra.pegasus.client.request.MultiDelete;
import com.xiaomi.infra.pegasus.client.request.MultiGet;
import com.xiaomi.infra.pegasus.client.request.MultiSet;
import com.xiaomi.infra.pegasus.client.request.RangeDelete;
import com.xiaomi.infra.pegasus.client.request.RangeGet;
import com.xiaomi.infra.pegasus.client.request.Set;
import com.xiaomi.infra.pegasus.rpc.*;
import com.xiaomi.infra.pegasus.tools.Tools;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -199,6 +212,94 @@ public ClientOptions getConfiguration() {
return clientOptions;
}

@Override
public boolean exist(String tableName, Get get) throws PException {
PegasusTable tb = getTable(tableName);
return tb.exist(get, 0);
}

@Override
public byte[] get(String tableName, Get get) throws PException {
PegasusTable tb = getTable(tableName);
return tb.get(get, 0);
}

@Override
public void batchGet(String tableName, BatchGet batchGet, List<Pair<PException, byte[]>> results)
throws PException {
PegasusTable tb = getTable(tableName);
tb.batchGet(batchGet, results, 0);
}

@Override
public MultiGetResult multiGet(String tableName, MultiGet multiGet) throws PException {
PegasusTable tb = getTable(tableName);
return tb.multiGet(multiGet, 0);
}

@Override
public MultiGetResult rangeGet(String tableName, RangeGet rangeGet) throws PException {
PegasusTable tb = getTable(tableName);
return tb.rangeGet(rangeGet, 0);
}

@Override
public void set(String tableName, Set set) throws PException {
PegasusTable tb = getTable(tableName);
tb.set(set, 0);
}

@Override
public void batchSet(String tableName, BatchSet batchSet, List<Pair<PException, Void>> results)
throws PException {
PegasusTable tb = getTable(tableName);
tb.batchSet(batchSet, results, 0);
}

@Override
public void multiSet(String tableName, MultiSet multiSet) throws PException {
PegasusTable tb = getTable(tableName);
tb.multiSet(multiSet, 0);
}

@Override
public void del(String tableName, Delete delete) throws PException {
PegasusTable tb = getTable(tableName);
tb.del(delete, 0);
}

@Override
public void batchDel(
String tableName, BatchDelete batchDelete, List<Pair<PException, Void>> results)
throws PException {
PegasusTable tb = getTable(tableName);
tb.batchDel(batchDelete, results, 0);
}

@Override
public void multiDel(String tableName, MultiDelete multiDelete) throws PException {
PegasusTable tb = getTable(tableName);
tb.multiDel(multiDelete, 0);
}

@Override
public void rangeDel(String tableName, RangeDelete rangeDelete) throws PException {
PegasusTable tb = getTable(tableName);
tb.rangeDel(rangeDelete, 0);
}

@Override
public int ttl(String tableName, Get get) throws PException {
PegasusTable tb = getTable(tableName);
return tb.ttl(get, 0);
}

@Override
public long incr(String tableName, Increment increment) throws PException {
PegasusTable tb = getTable(tableName);
return tb.incr(increment, 0);
}

@Override
public boolean exist(String tableName, byte[] hashKey, byte[] sortKey) throws PException {
PegasusTable tb = getTable(tableName);
Expand Down
Loading