From ad617fafe70d292b35691e575dc8b47a26502af9 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 16 Oct 2019 17:25:38 +0800 Subject: [PATCH 1/8] add more exception info --- .../infra/pegasus/client/PegasusTable.java | 107 +++++++++++++----- .../infra/pegasus/rpc/async/TableHandler.java | 28 ++++- 2 files changed, 106 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 1aaa2340..c1d4b2d6 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -10,9 +10,12 @@ import com.xiaomi.infra.pegasus.operator.*; import com.xiaomi.infra.pegasus.rpc.ReplicationException; import com.xiaomi.infra.pegasus.rpc.Table; +import com.xiaomi.infra.pegasus.rpc.async.TableHandler; +import com.xiaomi.infra.pegasus.rpc.async.TableHandler.ReplicaConfiguration; import com.xiaomi.infra.pegasus.tools.Tools; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ExecutionException; @@ -74,10 +77,10 @@ public Future asyncSortKeyCount(byte[] hashKey, int timeout) { Table.ClientOPCallback callback = new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) { + public void onCompletion(client_operator clientOP) throws UnknownHostException { rrdb_sortkey_count_operator op = (rrdb_sortkey_count_operator) clientOP; if (op.rpc_error.errno != error_code.error_types.ERR_OK) { - promise.setFailure(new PException(new ReplicationException(op.rpc_error.errno))); + handleReplicationException(promise, op, (TableHandler) table); } else if (op.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op.get_response().error)); } else { @@ -101,10 +104,10 @@ public Future asyncGet(byte[] hashKey, byte[] sortKey, int timeout /* ms Table.ClientOPCallback callback = new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) { + public void onCompletion(client_operator clientOP) throws UnknownHostException { rrdb_get_operator gop = (rrdb_get_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { - promise.setFailure(new PException(new ReplicationException(gop.rpc_error.errno))); + handleReplicationException(promise, op, (TableHandler) table); } else if (gop.get_response().error == 1) { // rocksdb::kNotFound promise.setSuccess(null); } else if (gop.get_response().error != 0) { @@ -140,10 +143,10 @@ public Future asyncSet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) { + public void onCompletion(client_operator clientOP) throws UnknownHostException { rrdb_put_operator gop = (rrdb_put_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { - promise.setFailure(new PException(new ReplicationException(gop.rpc_error.errno))); + handleReplicationException(promise, op, (TableHandler) table); } else if (gop.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); } else { @@ -222,10 +225,10 @@ private Future asyncMultiGet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) { + public void onCompletion(client_operator clientOP) throws UnknownHostException { rrdb_multi_get_operator gop = (rrdb_multi_get_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { - promise.setFailure(new PException(new ReplicationException(gop.rpc_error.errno))); + handleReplicationException(promise, op, (TableHandler) table); } else if (gop.get_response().error != 0 && gop.get_response().error != 7) { // rocksdb::Status::kOk && rocksdb::Status::kIncomplete promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); @@ -313,10 +316,10 @@ public Future asyncMultiGet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) { + public void onCompletion(client_operator clientOP) throws UnknownHostException { rrdb_multi_get_operator gop = (rrdb_multi_get_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { - promise.setFailure(new PException(new ReplicationException(gop.rpc_error.errno))); + handleReplicationException(promise, op, (TableHandler) table); } else if (gop.get_response().error != 0 && gop.get_response().error != 7) { // rocksdb::Status::kOk && rocksdb::Status::kIncomplete promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); @@ -425,10 +428,10 @@ public Future asyncMultiSet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) { + public void onCompletion(client_operator clientOP) throws UnknownHostException { rrdb_multi_put_operator op2 = (rrdb_multi_put_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno))); + handleReplicationException(promise, op, (TableHandler) table); } else if (op2.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -459,10 +462,10 @@ public Future asyncDel(byte[] hashKey, byte[] sortKey, int timeout) { op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) { + public void onCompletion(client_operator clientOP) throws UnknownHostException { rrdb_remove_operator op2 = (rrdb_remove_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno))); + handleReplicationException(promise, op, (TableHandler) table); } else if (op2.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -511,10 +514,10 @@ public Future asyncMultiDel(byte[] hashKey, final List sortKeys, i table.asyncOperate( op, new Table.ClientOPCallback() { - public void onCompletion(client_operator clientOP) { + public void onCompletion(client_operator clientOP) throws UnknownHostException { rrdb_multi_remove_operator op2 = (rrdb_multi_remove_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno))); + handleReplicationException(promise, op, (TableHandler) table); } else if (op2.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -548,10 +551,10 @@ public Future asyncIncr( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) { + public void onCompletion(client_operator clientOP) throws UnknownHostException { rrdb_incr_operator op2 = (rrdb_incr_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno))); + handleReplicationException(promise, op, (TableHandler) table); } else if (op2.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -626,10 +629,10 @@ public Future asyncCheckAndSet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) { + public void onCompletion(client_operator clientOP) throws UnknownHostException { rrdb_check_and_set_operator op2 = (rrdb_check_and_set_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno))); + handleReplicationException(promise, op, (TableHandler) table); } else if (op2.get_response().error != 0 && op2.get_response().error != 13) { // 13 : kTryAgain promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); @@ -710,10 +713,10 @@ public Future asyncCheckAndMutate( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) { + public void onCompletion(client_operator clientOP) throws UnknownHostException { rrdb_check_and_mutate_operator op2 = (rrdb_check_and_mutate_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno))); + handleReplicationException(promise, op, (TableHandler) table); } else if (op2.get_response().error != 0 && op2.get_response().error != 13) { // 13 : kTryAgain promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); @@ -794,10 +797,10 @@ public Future asyncCompareExchange( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) { + public void onCompletion(client_operator clientOP) throws UnknownHostException { rrdb_check_and_set_operator op2 = (rrdb_check_and_set_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno))); + handleReplicationException(promise, op, (TableHandler) table); } else if (op2.get_response().error != 0 && op2.get_response().error != 13) { // 13 : kTryAgain promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); @@ -835,10 +838,10 @@ public Future asyncTTL(byte[] hashKey, byte[] sortKey, int timeout) { op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) { + public void onCompletion(client_operator clientOP) throws UnknownHostException { rrdb_ttl_operator op2 = (rrdb_ttl_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - promise.setFailure(new PException(new ReplicationException(op2.rpc_error.errno))); + handleReplicationException(promise, op, (TableHandler) table); } else if (op2.get_response().error != 0 && op2.get_response().error != 1) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -1640,4 +1643,56 @@ public List getUnorderedScanners(int maxSplitCount, Sca } return ret; } + + private void handleReplicationException( + DefaultPromise promise, client_operator op, TableHandler table) throws UnknownHostException { + gpid gPid = op.get_gpid(); + String tableName = table.getTableName(); + ReplicaConfiguration replicaConfiguration = table.getReplicaConfig(gPid.get_pidx()); + String replicaServer = + replicaConfiguration.primary.get_ip() + ":" + replicaConfiguration.primary.get_port(); + String operateName = op.name(); + + String message; + String header = + "[table=" + + tableName + + ",operateName=" + + operateName + + ",replicaServer=" + + replicaServer + + ",gPid=(" + + gPid.toString() + + ")" + + "]"; + switch (op.rpc_error.errno) { + case ERR_SESSION_RESET: + message = "The replica server address is error, please confirm the address!"; + promise.setFailure( + new PException(new ReplicationException(op.rpc_error.errno, header + message))); + break; + case ERR_TIMEOUT: + message = "The operation is time out!"; + promise.setFailure( + new PException(new ReplicationException(op.rpc_error.errno, header + message))); + break; + case ERR_OBJECT_NOT_FOUND: + message = "The replica server doesn't serve this gPid!"; + promise.setFailure( + new PException(new ReplicationException(op.rpc_error.errno, header + message))); + break; + case ERR_BUSY: + message = "The replica server is busy, maybe your table sets write_throttling!"; + promise.setFailure( + new PException(new ReplicationException(op.rpc_error.errno, header + message))); + break; + case ERR_INVALID_STATE: + message = "The replica server is not primary!"; + promise.setFailure( + new PException(new ReplicationException(op.rpc_error.errno, header + message))); + break; + default: + promise.setFailure(new PException(new ReplicationException(op.rpc_error.errno, header))); + } + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index c01cb45c..d6c372cd 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -16,6 +16,7 @@ import com.xiaomi.infra.pegasus.rpc.Table; import io.netty.util.concurrent.*; import java.util.ArrayList; +import java.util.Arrays; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -24,7 +25,7 @@ /** Created by sunweijie@xiaomi.com on 16-11-11. */ public class TableHandler extends Table { - static final class ReplicaConfiguration { + public static final class ReplicaConfiguration { public gpid pid = new gpid(); public long ballot = 0; public rpc_address primary = new rpc_address(); @@ -72,7 +73,7 @@ public TableHandler(ClusterManager mgr, String name, KeyHasher h) throws Replica error_types err = MetaSession.getMetaServiceError(op); if (err != error_types.ERR_OK) { - throw new ReplicationException(err); + handleMetaException(err, mgr, name); } query_cfg_response resp = op.get_response(); @@ -98,7 +99,7 @@ public TableHandler(ClusterManager mgr, String name, KeyHasher h) throws Replica lastQueryTime_ = 0; } - ReplicaConfiguration getReplicaConfig(int index) { + public ReplicaConfiguration getReplicaConfig(int index) { return tableConfig_.get().replicas.get(index); } @@ -423,4 +424,25 @@ public void asyncOperate(client_operator op, ClientOPCallback callback, int time new ClientRequestRound(op, callback, manager_.counterEnabled(), (long) timeoutMs); call(round, 1); } + + private void handleMetaException(error_types err_type, ClusterManager mgr, String name) + throws ReplicationException { + String metaServer = Arrays.toString(mgr.getMetaList()); + String message; + String header = "[metaServer=" + metaServer + ",tableName=" + name + "]"; + switch (err_type) { + case ERR_OBJECT_NOT_FOUND: + message = + "The table is not existed under the meta_server, please confirm the meta_server url or table name!"; + throw new ReplicationException(err_type, header + message); + case ERR_BUSY_CREATING: + message = "The table is creating, please wait a moment and retry it!"; + throw new ReplicationException(err_type, header + message); + case ERR_BUSY_DROPPING: + message = "The table is dropping, please confirm the table name!"; + throw new ReplicationException(err_type, header + message); + default: + throw new ReplicationException(err_type); + } + } } From 9f139f0e1a01a5d4ee05630e96c1a48bec38e019 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 16 Oct 2019 17:40:56 +0800 Subject: [PATCH 2/8] change message --- .../com/xiaomi/infra/pegasus/client/PegasusTable.java | 10 +++++----- .../xiaomi/infra/pegasus/rpc/async/TableHandler.java | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index c1d4b2d6..237e150a 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1667,27 +1667,27 @@ private void handleReplicationException( + "]"; switch (op.rpc_error.errno) { case ERR_SESSION_RESET: - message = "The replica server address is error, please confirm the address!"; + message = " The replica can't be access, please confirm the address!"; promise.setFailure( new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; case ERR_TIMEOUT: - message = "The operation is time out!"; + message = " The operation is time out!"; promise.setFailure( new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; case ERR_OBJECT_NOT_FOUND: - message = "The replica server doesn't serve this gPid!"; + message = " The replica server doesn't serve this gPid!"; promise.setFailure( new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; case ERR_BUSY: - message = "The replica server is busy, maybe your table sets write_throttling!"; + message = " The replica server is busy, maybe your table sets write_throttling!"; promise.setFailure( new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; case ERR_INVALID_STATE: - message = "The replica server is not primary!"; + message = " The replica server is not primary!"; promise.setFailure( new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index d6c372cd..1d5ac064 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -433,13 +433,13 @@ private void handleMetaException(error_types err_type, ClusterManager mgr, Strin switch (err_type) { case ERR_OBJECT_NOT_FOUND: message = - "The table is not existed under the meta_server, please confirm the meta_server url or table name!"; + " The table is not existed under the meta_server, please confirm the meta_server url or table name!"; throw new ReplicationException(err_type, header + message); case ERR_BUSY_CREATING: - message = "The table is creating, please wait a moment and retry it!"; + message = " The table is creating, please wait a moment and retry it!"; throw new ReplicationException(err_type, header + message); case ERR_BUSY_DROPPING: - message = "The table is dropping, please confirm the table name!"; + message = " The table is dropping, please confirm the table name!"; throw new ReplicationException(err_type, header + message); default: throw new ReplicationException(err_type); From be8a4f757356234f8035997350da47af1b25040f Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 16 Oct 2019 17:42:04 +0800 Subject: [PATCH 3/8] change name --- src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 237e150a..ac2826cf 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1657,7 +1657,7 @@ private void handleReplicationException( String header = "[table=" + tableName - + ",operateName=" + + ",operation=" + operateName + ",replicaServer=" + replicaServer From b49d285b1420e5eed291dbff08610598c27a3da3 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Wed, 16 Oct 2019 19:10:35 +0800 Subject: [PATCH 4/8] change PException --- .../infra/pegasus/client/PegasusTable.java | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index ac2826cf..bcdbb67f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -77,7 +77,7 @@ public Future asyncSortKeyCount(byte[] hashKey, int timeout) { Table.ClientOPCallback callback = new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws UnknownHostException { + public void onCompletion(client_operator clientOP) throws PException { rrdb_sortkey_count_operator op = (rrdb_sortkey_count_operator) clientOP; if (op.rpc_error.errno != error_code.error_types.ERR_OK) { handleReplicationException(promise, op, (TableHandler) table); @@ -104,7 +104,7 @@ public Future asyncGet(byte[] hashKey, byte[] sortKey, int timeout /* ms Table.ClientOPCallback callback = new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws UnknownHostException { + public void onCompletion(client_operator clientOP) throws PException { rrdb_get_operator gop = (rrdb_get_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { handleReplicationException(promise, op, (TableHandler) table); @@ -143,7 +143,7 @@ public Future asyncSet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws UnknownHostException { + public void onCompletion(client_operator clientOP) throws PException { rrdb_put_operator gop = (rrdb_put_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { handleReplicationException(promise, op, (TableHandler) table); @@ -225,7 +225,7 @@ private Future asyncMultiGet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws UnknownHostException { + public void onCompletion(client_operator clientOP) throws PException { rrdb_multi_get_operator gop = (rrdb_multi_get_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { handleReplicationException(promise, op, (TableHandler) table); @@ -316,7 +316,7 @@ public Future asyncMultiGet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws UnknownHostException { + public void onCompletion(client_operator clientOP) throws PException { rrdb_multi_get_operator gop = (rrdb_multi_get_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { handleReplicationException(promise, op, (TableHandler) table); @@ -428,7 +428,7 @@ public Future asyncMultiSet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws UnknownHostException { + public void onCompletion(client_operator clientOP) throws PException { rrdb_multi_put_operator op2 = (rrdb_multi_put_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { handleReplicationException(promise, op, (TableHandler) table); @@ -462,7 +462,7 @@ public Future asyncDel(byte[] hashKey, byte[] sortKey, int timeout) { op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws UnknownHostException { + public void onCompletion(client_operator clientOP) throws PException { rrdb_remove_operator op2 = (rrdb_remove_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { handleReplicationException(promise, op, (TableHandler) table); @@ -514,7 +514,7 @@ public Future asyncMultiDel(byte[] hashKey, final List sortKeys, i table.asyncOperate( op, new Table.ClientOPCallback() { - public void onCompletion(client_operator clientOP) throws UnknownHostException { + public void onCompletion(client_operator clientOP) throws PException { rrdb_multi_remove_operator op2 = (rrdb_multi_remove_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { handleReplicationException(promise, op, (TableHandler) table); @@ -551,7 +551,7 @@ public Future asyncIncr( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws UnknownHostException { + public void onCompletion(client_operator clientOP) throws PException { rrdb_incr_operator op2 = (rrdb_incr_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { handleReplicationException(promise, op, (TableHandler) table); @@ -629,7 +629,7 @@ public Future asyncCheckAndSet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws UnknownHostException { + public void onCompletion(client_operator clientOP) throws PException { rrdb_check_and_set_operator op2 = (rrdb_check_and_set_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { handleReplicationException(promise, op, (TableHandler) table); @@ -713,7 +713,7 @@ public Future asyncCheckAndMutate( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws UnknownHostException { + public void onCompletion(client_operator clientOP) throws PException { rrdb_check_and_mutate_operator op2 = (rrdb_check_and_mutate_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { handleReplicationException(promise, op, (TableHandler) table); @@ -797,7 +797,7 @@ public Future asyncCompareExchange( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws UnknownHostException { + public void onCompletion(client_operator clientOP) throws PException { rrdb_check_and_set_operator op2 = (rrdb_check_and_set_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { handleReplicationException(promise, op, (TableHandler) table); @@ -838,7 +838,7 @@ public Future asyncTTL(byte[] hashKey, byte[] sortKey, int timeout) { op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws UnknownHostException { + public void onCompletion(client_operator clientOP) throws PException { rrdb_ttl_operator op2 = (rrdb_ttl_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { handleReplicationException(promise, op, (TableHandler) table); @@ -1645,12 +1645,17 @@ public List getUnorderedScanners(int maxSplitCount, Sca } private void handleReplicationException( - DefaultPromise promise, client_operator op, TableHandler table) throws UnknownHostException { + DefaultPromise promise, client_operator op, TableHandler table) throws PException { gpid gPid = op.get_gpid(); String tableName = table.getTableName(); ReplicaConfiguration replicaConfiguration = table.getReplicaConfig(gPid.get_pidx()); - String replicaServer = - replicaConfiguration.primary.get_ip() + ":" + replicaConfiguration.primary.get_port(); + String replicaServer = null; + try { + replicaServer = + replicaConfiguration.primary.get_ip() + ":" + replicaConfiguration.primary.get_port(); + } catch (UnknownHostException e) { + throw new PException(new InternalError("Get replica server address error")); + } String operateName = op.name(); String message; From eca1a03085d5198ad5cd94ceb19d0de686a343e7 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Thu, 17 Oct 2019 10:36:45 +0800 Subject: [PATCH 5/8] change PException --- .../infra/pegasus/client/PegasusTable.java | 76 +++++++++---------- .../infra/pegasus/rpc/async/TableHandler.java | 3 +- 2 files changed, 40 insertions(+), 39 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index bcdbb67f..51397d6b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -77,10 +77,10 @@ public Future asyncSortKeyCount(byte[] hashKey, int timeout) { Table.ClientOPCallback callback = new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws PException { + public void onCompletion(client_operator clientOP) { rrdb_sortkey_count_operator op = (rrdb_sortkey_count_operator) clientOP; if (op.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicationException(promise, op, (TableHandler) table); + handleReplicaException(promise, op, table, timeout); } else if (op.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op.get_response().error)); } else { @@ -104,10 +104,10 @@ public Future asyncGet(byte[] hashKey, byte[] sortKey, int timeout /* ms Table.ClientOPCallback callback = new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws PException { + public void onCompletion(client_operator clientOP) { rrdb_get_operator gop = (rrdb_get_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicationException(promise, op, (TableHandler) table); + handleReplicaException(promise, op, table, timeout); } else if (gop.get_response().error == 1) { // rocksdb::kNotFound promise.setSuccess(null); } else if (gop.get_response().error != 0) { @@ -143,10 +143,10 @@ public Future asyncSet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws PException { + public void onCompletion(client_operator clientOP) { rrdb_put_operator gop = (rrdb_put_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicationException(promise, op, (TableHandler) table); + handleReplicaException(promise, op, table, timeout); } else if (gop.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); } else { @@ -225,10 +225,10 @@ private Future asyncMultiGet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws PException { + public void onCompletion(client_operator clientOP) { rrdb_multi_get_operator gop = (rrdb_multi_get_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicationException(promise, op, (TableHandler) table); + handleReplicaException(promise, op, table, timeout); } else if (gop.get_response().error != 0 && gop.get_response().error != 7) { // rocksdb::Status::kOk && rocksdb::Status::kIncomplete promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); @@ -316,10 +316,10 @@ public Future asyncMultiGet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws PException { + public void onCompletion(client_operator clientOP) { rrdb_multi_get_operator gop = (rrdb_multi_get_operator) clientOP; if (gop.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicationException(promise, op, (TableHandler) table); + handleReplicaException(promise, op, table, timeout); } else if (gop.get_response().error != 0 && gop.get_response().error != 7) { // rocksdb::Status::kOk && rocksdb::Status::kIncomplete promise.setFailure(new PException("rocksdb error: " + gop.get_response().error)); @@ -428,10 +428,10 @@ public Future asyncMultiSet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws PException { + public void onCompletion(client_operator clientOP) { rrdb_multi_put_operator op2 = (rrdb_multi_put_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicationException(promise, op, (TableHandler) table); + handleReplicaException(promise, op, table, timeout); } else if (op2.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -462,10 +462,10 @@ public Future asyncDel(byte[] hashKey, byte[] sortKey, int timeout) { op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws PException { + public void onCompletion(client_operator clientOP) { rrdb_remove_operator op2 = (rrdb_remove_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicationException(promise, op, (TableHandler) table); + handleReplicaException(promise, op, table, timeout); } else if (op2.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -514,10 +514,10 @@ public Future asyncMultiDel(byte[] hashKey, final List sortKeys, i table.asyncOperate( op, new Table.ClientOPCallback() { - public void onCompletion(client_operator clientOP) throws PException { + public void onCompletion(client_operator clientOP) { rrdb_multi_remove_operator op2 = (rrdb_multi_remove_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicationException(promise, op, (TableHandler) table); + handleReplicaException(promise, op, table, timeout); } else if (op2.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -551,10 +551,10 @@ public Future asyncIncr( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws PException { + public void onCompletion(client_operator clientOP) { rrdb_incr_operator op2 = (rrdb_incr_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicationException(promise, op, (TableHandler) table); + handleReplicaException(promise, op, table, timeout); } else if (op2.get_response().error != 0) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -629,10 +629,10 @@ public Future asyncCheckAndSet( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws PException { + public void onCompletion(client_operator clientOP) { rrdb_check_and_set_operator op2 = (rrdb_check_and_set_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicationException(promise, op, (TableHandler) table); + handleReplicaException(promise, op, table, timeout); } else if (op2.get_response().error != 0 && op2.get_response().error != 13) { // 13 : kTryAgain promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); @@ -713,10 +713,10 @@ public Future asyncCheckAndMutate( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws PException { + public void onCompletion(client_operator clientOP) { rrdb_check_and_mutate_operator op2 = (rrdb_check_and_mutate_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicationException(promise, op, (TableHandler) table); + handleReplicaException(promise, op, table, timeout); } else if (op2.get_response().error != 0 && op2.get_response().error != 13) { // 13 : kTryAgain promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); @@ -797,10 +797,10 @@ public Future asyncCompareExchange( op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws PException { + public void onCompletion(client_operator clientOP) { rrdb_check_and_set_operator op2 = (rrdb_check_and_set_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicationException(promise, op, (TableHandler) table); + handleReplicaException(promise, op, table, timeout); } else if (op2.get_response().error != 0 && op2.get_response().error != 13) { // 13 : kTryAgain promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); @@ -838,10 +838,10 @@ public Future asyncTTL(byte[] hashKey, byte[] sortKey, int timeout) { op, new Table.ClientOPCallback() { @Override - public void onCompletion(client_operator clientOP) throws PException { + public void onCompletion(client_operator clientOP) { rrdb_ttl_operator op2 = (rrdb_ttl_operator) clientOP; if (op2.rpc_error.errno != error_code.error_types.ERR_OK) { - handleReplicationException(promise, op, (TableHandler) table); + handleReplicaException(promise, op, table, timeout); } else if (op2.get_response().error != 0 && op2.get_response().error != 1) { promise.setFailure(new PException("rocksdb error: " + op2.get_response().error)); } else { @@ -1644,26 +1644,26 @@ public List getUnorderedScanners(int maxSplitCount, Sca return ret; } - private void handleReplicationException( - DefaultPromise promise, client_operator op, TableHandler table) throws PException { + private void handleReplicaException( + DefaultPromise promise, client_operator op, Table table, int timeout) { gpid gPid = op.get_gpid(); - String tableName = table.getTableName(); - ReplicaConfiguration replicaConfiguration = table.getReplicaConfig(gPid.get_pidx()); + ReplicaConfiguration replicaConfiguration = + ((TableHandler) table).getReplicaConfig(gPid.get_pidx()); String replicaServer = null; try { replicaServer = replicaConfiguration.primary.get_ip() + ":" + replicaConfiguration.primary.get_port(); } catch (UnknownHostException e) { - throw new PException(new InternalError("Get replica server address error")); + promise.setFailure(new PException(e)); + return; } - String operateName = op.name(); String message; String header = "[table=" - + tableName + + table.getTableName() + ",operation=" - + operateName + + op.name() + ",replicaServer=" + replicaServer + ",gPid=(" @@ -1677,22 +1677,22 @@ private void handleReplicationException( new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; case ERR_TIMEOUT: - message = " The operation is time out!"; + message = " The operationTimeout is " + timeout + "ms"; promise.setFailure( new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; case ERR_OBJECT_NOT_FOUND: - message = " The replica server doesn't serve this gPid!"; + message = " The replica server doesn't serve this partition!"; promise.setFailure( new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; case ERR_BUSY: - message = " The replica server is busy, maybe your table sets write_throttling!"; + message = " Rate of requests exceeds the throughput limit!"; promise.setFailure( new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; case ERR_INVALID_STATE: - message = " The replica server is not primary!"; + message = " The target replica is not primary!"; promise.setFailure( new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 1d5ac064..6102e016 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -74,6 +74,7 @@ public TableHandler(ClusterManager mgr, String name, KeyHasher h) throws Replica error_types err = MetaSession.getMetaServiceError(op); if (err != error_types.ERR_OK) { handleMetaException(err, mgr, name); + return; } query_cfg_response resp = op.get_response(); @@ -433,7 +434,7 @@ private void handleMetaException(error_types err_type, ClusterManager mgr, Strin switch (err_type) { case ERR_OBJECT_NOT_FOUND: message = - " The table is not existed under the meta_server, please confirm the meta_server url or table name!"; + " No such table. Please make sure your meta addresses and table name are correct!"; throw new ReplicationException(err_type, header + message); case ERR_BUSY_CREATING: message = " The table is creating, please wait a moment and retry it!"; From f4810a189a479cd37f74f3afd46e9fa6f402f1f8 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Thu, 17 Oct 2019 10:39:26 +0800 Subject: [PATCH 6/8] change PException --- src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 51397d6b..498e09c6 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1677,7 +1677,7 @@ private void handleReplicaException( new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; case ERR_TIMEOUT: - message = " The operationTimeout is " + timeout + "ms"; + message = " The operationTimeout is " + timeout + "ms!"; promise.setFailure( new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; From 7c5a7559037c3ca8ed03b527781d7c6d703627e2 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Thu, 17 Oct 2019 10:50:31 +0800 Subject: [PATCH 7/8] change PException --- .../infra/pegasus/client/PegasusTable.java | 16 +++------------- .../infra/pegasus/rpc/async/TableHandler.java | 11 +++++------ 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 498e09c6..bb892f0d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1658,7 +1658,7 @@ private void handleReplicaException( return; } - String message; + String message = ""; String header = "[table=" + table.getTableName() @@ -1673,31 +1673,21 @@ private void handleReplicaException( switch (op.rpc_error.errno) { case ERR_SESSION_RESET: message = " The replica can't be access, please confirm the address!"; - promise.setFailure( - new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; case ERR_TIMEOUT: message = " The operationTimeout is " + timeout + "ms!"; - promise.setFailure( - new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; case ERR_OBJECT_NOT_FOUND: message = " The replica server doesn't serve this partition!"; - promise.setFailure( - new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; case ERR_BUSY: message = " Rate of requests exceeds the throughput limit!"; - promise.setFailure( - new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; case ERR_INVALID_STATE: message = " The target replica is not primary!"; - promise.setFailure( - new PException(new ReplicationException(op.rpc_error.errno, header + message))); break; - default: - promise.setFailure(new PException(new ReplicationException(op.rpc_error.errno, header))); } + promise.setFailure( + new PException(new ReplicationException(op.rpc_error.errno, header + message))); } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 6102e016..511b9166 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -429,21 +429,20 @@ public void asyncOperate(client_operator op, ClientOPCallback callback, int time private void handleMetaException(error_types err_type, ClusterManager mgr, String name) throws ReplicationException { String metaServer = Arrays.toString(mgr.getMetaList()); - String message; + String message = ""; String header = "[metaServer=" + metaServer + ",tableName=" + name + "]"; switch (err_type) { case ERR_OBJECT_NOT_FOUND: message = " No such table. Please make sure your meta addresses and table name are correct!"; - throw new ReplicationException(err_type, header + message); + break; case ERR_BUSY_CREATING: message = " The table is creating, please wait a moment and retry it!"; - throw new ReplicationException(err_type, header + message); + break; case ERR_BUSY_DROPPING: message = " The table is dropping, please confirm the table name!"; - throw new ReplicationException(err_type, header + message); - default: - throw new ReplicationException(err_type); + break; } + throw new ReplicationException(err_type, header + message); } } From 26829796b1fd1454aa97f55710f80564db745b76 Mon Sep 17 00:00:00 2001 From: JiaShuo Date: Thu, 17 Oct 2019 11:59:41 +0800 Subject: [PATCH 8/8] change PException --- .../java/com/xiaomi/infra/pegasus/client/PegasusTable.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index bb892f0d..04e0e54e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1649,7 +1649,7 @@ private void handleReplicaException( gpid gPid = op.get_gpid(); ReplicaConfiguration replicaConfiguration = ((TableHandler) table).getReplicaConfig(gPid.get_pidx()); - String replicaServer = null; + String replicaServer; try { replicaServer = replicaConfiguration.primary.get_ip() + ":" + replicaConfiguration.primary.get_port(); @@ -1666,13 +1666,13 @@ private void handleReplicaException( + op.name() + ",replicaServer=" + replicaServer - + ",gPid=(" + + ",gpid=(" + gPid.toString() + ")" + "]"; switch (op.rpc_error.errno) { case ERR_SESSION_RESET: - message = " The replica can't be access, please confirm the address!"; + message = " Disconnected from the replica-server due to internal error!"; break; case ERR_TIMEOUT: message = " The operationTimeout is " + timeout + "ms!";