Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

refactor: add more exception info #59

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 63 additions & 13 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
import com.xiaomi.infra.pegasus.operator.*;
import com.xiaomi.infra.pegasus.rpc.ReplicationException;
import com.xiaomi.infra.pegasus.rpc.Table;
import com.xiaomi.infra.pegasus.rpc.async.TableHandler;
import com.xiaomi.infra.pegasus.rpc.async.TableHandler.ReplicaConfiguration;
import com.xiaomi.infra.pegasus.tools.Tools;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -77,7 +80,7 @@ public Future<Long> asyncSortKeyCount(byte[] hashKey, int timeout) {
public void onCompletion(client_operator clientOP) {
rrdb_sortkey_count_operator op = (rrdb_sortkey_count_operator) clientOP;
if (op.rpc_error.errno != error_code.error_types.ERR_OK) {
promise.setFailure(new PException(new ReplicationException(op.rpc_error.errno)));
handleReplicaException(promise, op, table, timeout);
} else if (op.get_response().error != 0) {
promise.setFailure(new PException("rocksdb error: " + op.get_response().error));
} else {
Expand All @@ -104,7 +107,7 @@ public Future<byte[]> asyncGet(byte[] hashKey, byte[] sortKey, int timeout /* ms
public void onCompletion(client_operator clientOP) {
rrdb_get_operator gop = (rrdb_get_operator) clientOP;
if (gop.rpc_error.errno != error_code.error_types.ERR_OK) {
promise.setFailure(new PException(new ReplicationException(gop.rpc_error.errno)));
handleReplicaException(promise, op, table, timeout);
} else if (gop.get_response().error == 1) { // rocksdb::kNotFound
promise.setSuccess(null);
} else if (gop.get_response().error != 0) {
Expand Down Expand Up @@ -143,7 +146,7 @@ public Future<Void> asyncSet(
public void onCompletion(client_operator clientOP) {
rrdb_put_operator gop = (rrdb_put_operator) clientOP;
if (gop.rpc_error.errno != error_code.error_types.ERR_OK) {
promise.setFailure(new PException(new ReplicationException(gop.rpc_error.errno)));
handleReplicaException(promise, op, table, timeout);
} else if (gop.get_response().error != 0) {
promise.setFailure(new PException("rocksdb error: " + gop.get_response().error));
} else {
Expand Down Expand Up @@ -225,7 +228,7 @@ private Future<MultiGetResult> asyncMultiGet(
public void onCompletion(client_operator clientOP) {
rrdb_multi_get_operator gop = (rrdb_multi_get_operator) clientOP;
if (gop.rpc_error.errno != error_code.error_types.ERR_OK) {
promise.setFailure(new PException(new ReplicationException(gop.rpc_error.errno)));
handleReplicaException(promise, op, table, timeout);
} else if (gop.get_response().error != 0 && gop.get_response().error != 7) {
// rocksdb::Status::kOk && rocksdb::Status::kIncomplete
promise.setFailure(new PException("rocksdb error: " + gop.get_response().error));
Expand Down Expand Up @@ -316,7 +319,7 @@ public Future<MultiGetResult> asyncMultiGet(
public void onCompletion(client_operator clientOP) {
rrdb_multi_get_operator gop = (rrdb_multi_get_operator) clientOP;
if (gop.rpc_error.errno != error_code.error_types.ERR_OK) {
promise.setFailure(new PException(new ReplicationException(gop.rpc_error.errno)));
handleReplicaException(promise, op, table, timeout);
} else if (gop.get_response().error != 0 && gop.get_response().error != 7) {
// rocksdb::Status::kOk && rocksdb::Status::kIncomplete
promise.setFailure(new PException("rocksdb error: " + gop.get_response().error));
Expand Down Expand Up @@ -428,7 +431,7 @@ public Future<Void> asyncMultiSet(
public void onCompletion(client_operator clientOP) {
rrdb_multi_put_operator op2 = (rrdb_multi_put_operator) clientOP;
if (op2.rpc_error.errno != error_code.error_types.ERR_OK) {
promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno)));
handleReplicaException(promise, op, table, timeout);
} else if (op2.get_response().error != 0) {
promise.setFailure(new PException("rocksdb error: " + op2.get_response().error));
} else {
Expand Down Expand Up @@ -462,7 +465,7 @@ public Future<Void> asyncDel(byte[] hashKey, byte[] sortKey, int timeout) {
public void onCompletion(client_operator clientOP) {
rrdb_remove_operator op2 = (rrdb_remove_operator) clientOP;
if (op2.rpc_error.errno != error_code.error_types.ERR_OK) {
promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno)));
handleReplicaException(promise, op, table, timeout);
} else if (op2.get_response().error != 0) {
promise.setFailure(new PException("rocksdb error: " + op2.get_response().error));
} else {
Expand Down Expand Up @@ -514,7 +517,7 @@ public Future<Void> asyncMultiDel(byte[] hashKey, final List<byte[]> sortKeys, i
public void onCompletion(client_operator clientOP) {
rrdb_multi_remove_operator op2 = (rrdb_multi_remove_operator) clientOP;
if (op2.rpc_error.errno != error_code.error_types.ERR_OK) {
promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno)));
handleReplicaException(promise, op, table, timeout);
} else if (op2.get_response().error != 0) {
promise.setFailure(new PException("rocksdb error: " + op2.get_response().error));
} else {
Expand Down Expand Up @@ -551,7 +554,7 @@ public Future<Long> asyncIncr(
public void onCompletion(client_operator clientOP) {
rrdb_incr_operator op2 = (rrdb_incr_operator) clientOP;
if (op2.rpc_error.errno != error_code.error_types.ERR_OK) {
promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno)));
handleReplicaException(promise, op, table, timeout);
} else if (op2.get_response().error != 0) {
promise.setFailure(new PException("rocksdb error: " + op2.get_response().error));
} else {
Expand Down Expand Up @@ -629,7 +632,7 @@ public Future<CheckAndSetResult> asyncCheckAndSet(
public void onCompletion(client_operator clientOP) {
rrdb_check_and_set_operator op2 = (rrdb_check_and_set_operator) clientOP;
if (op2.rpc_error.errno != error_code.error_types.ERR_OK) {
promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno)));
handleReplicaException(promise, op, table, timeout);
} else if (op2.get_response().error != 0
&& op2.get_response().error != 13) { // 13 : kTryAgain
promise.setFailure(new PException("rocksdb error: " + op2.get_response().error));
Expand Down Expand Up @@ -713,7 +716,7 @@ public Future<CheckAndMutateResult> asyncCheckAndMutate(
public void onCompletion(client_operator clientOP) {
rrdb_check_and_mutate_operator op2 = (rrdb_check_and_mutate_operator) clientOP;
if (op2.rpc_error.errno != error_code.error_types.ERR_OK) {
promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno)));
handleReplicaException(promise, op, table, timeout);
} else if (op2.get_response().error != 0
&& op2.get_response().error != 13) { // 13 : kTryAgain
promise.setFailure(new PException("rocksdb error: " + op2.get_response().error));
Expand Down Expand Up @@ -797,7 +800,7 @@ public Future<CompareExchangeResult> asyncCompareExchange(
public void onCompletion(client_operator clientOP) {
rrdb_check_and_set_operator op2 = (rrdb_check_and_set_operator) clientOP;
if (op2.rpc_error.errno != error_code.error_types.ERR_OK) {
promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno)));
handleReplicaException(promise, op, table, timeout);
} else if (op2.get_response().error != 0
&& op2.get_response().error != 13) { // 13 : kTryAgain
promise.setFailure(new PException("rocksdb error: " + op2.get_response().error));
Expand Down Expand Up @@ -838,7 +841,7 @@ public Future<Integer> asyncTTL(byte[] hashKey, byte[] sortKey, int timeout) {
public void onCompletion(client_operator clientOP) {
rrdb_ttl_operator op2 = (rrdb_ttl_operator) clientOP;
if (op2.rpc_error.errno != error_code.error_types.ERR_OK) {
promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno)));
handleReplicaException(promise, op, table, timeout);
} else if (op2.get_response().error != 0 && op2.get_response().error != 1) {
promise.setFailure(new PException("rocksdb error: " + op2.get_response().error));
} else {
Expand Down Expand Up @@ -1640,4 +1643,51 @@ public List<PegasusScannerInterface> getUnorderedScanners(int maxSplitCount, Sca
}
return ret;
}

private void handleReplicaException(
DefaultPromise promise, client_operator op, Table table, int timeout) {
gpid gPid = op.get_gpid();
ReplicaConfiguration replicaConfiguration =
((TableHandler) table).getReplicaConfig(gPid.get_pidx());
String replicaServer;
try {
replicaServer =
replicaConfiguration.primary.get_ip() + ":" + replicaConfiguration.primary.get_port();
} catch (UnknownHostException e) {
promise.setFailure(new PException(e));
return;
}

String message = "";
String header =
"[table="
+ table.getTableName()
+ ",operation="
+ op.name()
+ ",replicaServer="
+ replicaServer
+ ",gpid=("
+ gPid.toString()
+ ")"
+ "]";
switch (op.rpc_error.errno) {
case ERR_SESSION_RESET:
message = " Disconnected from the replica-server due to internal error!";
break;
case ERR_TIMEOUT:
message = " The operationTimeout is " + timeout + "ms!";
break;
case ERR_OBJECT_NOT_FOUND:
message = " The replica server doesn't serve this partition!";
break;
case ERR_BUSY:
message = " Rate of requests exceeds the throughput limit!";
break;
case ERR_INVALID_STATE:
message = " The target replica is not primary!";
break;
}
promise.setFailure(
new PException(new ReplicationException(op.rpc_error.errno, header + message)));
}
}
28 changes: 25 additions & 3 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.xiaomi.infra.pegasus.rpc.Table;
import io.netty.util.concurrent.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -24,7 +25,7 @@

/** Created by sunweijie@xiaomi.com on 16-11-11. */
public class TableHandler extends Table {
static final class ReplicaConfiguration {
public static final class ReplicaConfiguration {
public gpid pid = new gpid();
public long ballot = 0;
public rpc_address primary = new rpc_address();
Expand Down Expand Up @@ -72,7 +73,8 @@ public TableHandler(ClusterManager mgr, String name, KeyHasher h) throws Replica

error_types err = MetaSession.getMetaServiceError(op);
if (err != error_types.ERR_OK) {
throw new ReplicationException(err);
handleMetaException(err, mgr, name);
return;
}

query_cfg_response resp = op.get_response();
Expand All @@ -98,7 +100,7 @@ public TableHandler(ClusterManager mgr, String name, KeyHasher h) throws Replica
lastQueryTime_ = 0;
}

ReplicaConfiguration getReplicaConfig(int index) {
public ReplicaConfiguration getReplicaConfig(int index) {
return tableConfig_.get().replicas.get(index);
}

Expand Down Expand Up @@ -423,4 +425,24 @@ public void asyncOperate(client_operator op, ClientOPCallback callback, int time
new ClientRequestRound(op, callback, manager_.counterEnabled(), (long) timeoutMs);
call(round, 1);
}

private void handleMetaException(error_types err_type, ClusterManager mgr, String name)
throws ReplicationException {
String metaServer = Arrays.toString(mgr.getMetaList());
String message = "";
String header = "[metaServer=" + metaServer + ",tableName=" + name + "]";
switch (err_type) {
case ERR_OBJECT_NOT_FOUND:
message =
" No such table. Please make sure your meta addresses and table name are correct!";
break;
case ERR_BUSY_CREATING:
message = " The table is creating, please wait a moment and retry it!";
break;
case ERR_BUSY_DROPPING:
message = " The table is dropping, please confirm the table name!";
break;
}
throw new ReplicationException(err_type, header + message);
}
}