From bd83777abb5dd900f3964f4ed15fc867baf49a06 Mon Sep 17 00:00:00 2001 From: qinzuoyan Date: Thu, 27 Dec 2018 22:53:06 +0800 Subject: [PATCH] meta: return forward address in configuration_query_by_index_response --- .../serialization_helper/dsn.layer2_types.h | 13 +++++++- src/core/core/dsn.layer2_types.cpp | 24 ++++++++++++++ .../replication/meta_server/meta_service.cpp | 32 ++++++++++++++++--- .../replication/meta_server/meta_service.h | 3 +- src/dsn.layer2.thrift | 1 + 5 files changed, 66 insertions(+), 7 deletions(-) diff --git a/include/dsn/cpp/serialization_helper/dsn.layer2_types.h b/include/dsn/cpp/serialization_helper/dsn.layer2_types.h index 7cad4ceaa2..f72c1df233 100644 --- a/include/dsn/cpp/serialization_helper/dsn.layer2_types.h +++ b/include/dsn/cpp/serialization_helper/dsn.layer2_types.h @@ -203,7 +203,12 @@ inline std::ostream &operator<<(std::ostream &out, const configuration_query_by_ typedef struct _configuration_query_by_index_response__isset { _configuration_query_by_index_response__isset() - : err(false), app_id(false), partition_count(false), is_stateful(false), partitions(false) + : err(false), + app_id(false), + partition_count(false), + is_stateful(false), + partitions(false), + forward_address(false) { } bool err : 1; @@ -211,6 +216,7 @@ typedef struct _configuration_query_by_index_response__isset bool partition_count : 1; bool is_stateful : 1; bool partitions : 1; + bool forward_address : 1; } _configuration_query_by_index_response__isset; class configuration_query_by_index_response @@ -228,6 +234,7 @@ class configuration_query_by_index_response int32_t partition_count; bool is_stateful; std::vector partitions; + ::dsn::rpc_address forward_address; _configuration_query_by_index_response__isset __isset; @@ -241,6 +248,8 @@ class configuration_query_by_index_response void __set_partitions(const std::vector &val); + void __set_forward_address(const ::dsn::rpc_address &val); + bool operator==(const configuration_query_by_index_response &rhs) const { if (!(err == rhs.err)) @@ -253,6 +262,8 @@ class configuration_query_by_index_response return false; if (!(partitions == rhs.partitions)) return false; + if (!(forward_address == rhs.forward_address)) + return false; return true; } bool operator!=(const configuration_query_by_index_response &rhs) const diff --git a/src/core/core/dsn.layer2_types.cpp b/src/core/core/dsn.layer2_types.cpp index f50d65bf0f..e42b5d4044 100644 --- a/src/core/core/dsn.layer2_types.cpp +++ b/src/core/core/dsn.layer2_types.cpp @@ -500,6 +500,11 @@ void configuration_query_by_index_response::__set_partitions( this->partitions = val; } +void configuration_query_by_index_response::__set_forward_address(const ::dsn::rpc_address &val) +{ + this->forward_address = val; +} + uint32_t configuration_query_by_index_response::read(::apache::thrift::protocol::TProtocol *iprot) { @@ -570,6 +575,14 @@ uint32_t configuration_query_by_index_response::read(::apache::thrift::protocol: xfer += iprot->skip(ftype); } break; + case 6: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->forward_address.read(iprot); + this->__isset.forward_address = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -617,6 +630,10 @@ configuration_query_by_index_response::write(::apache::thrift::protocol::TProtoc } xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("forward_address", ::apache::thrift::protocol::T_STRUCT, 6); + xfer += this->forward_address.write(oprot); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -630,6 +647,7 @@ void swap(configuration_query_by_index_response &a, configuration_query_by_index swap(a.partition_count, b.partition_count); swap(a.is_stateful, b.is_stateful); swap(a.partitions, b.partitions); + swap(a.forward_address, b.forward_address); swap(a.__isset, b.__isset); } @@ -641,6 +659,7 @@ configuration_query_by_index_response::configuration_query_by_index_response( partition_count = other32.partition_count; is_stateful = other32.is_stateful; partitions = other32.partitions; + forward_address = other32.forward_address; __isset = other32.__isset; } configuration_query_by_index_response::configuration_query_by_index_response( @@ -651,6 +670,7 @@ configuration_query_by_index_response::configuration_query_by_index_response( partition_count = std::move(other33.partition_count); is_stateful = std::move(other33.is_stateful); partitions = std::move(other33.partitions); + forward_address = std::move(other33.forward_address); __isset = std::move(other33.__isset); } configuration_query_by_index_response &configuration_query_by_index_response:: @@ -661,6 +681,7 @@ operator=(const configuration_query_by_index_response &other34) partition_count = other34.partition_count; is_stateful = other34.is_stateful; partitions = other34.partitions; + forward_address = other34.forward_address; __isset = other34.__isset; return *this; } @@ -672,6 +693,7 @@ operator=(configuration_query_by_index_response &&other35) partition_count = std::move(other35.partition_count); is_stateful = std::move(other35.is_stateful); partitions = std::move(other35.partitions); + forward_address = std::move(other35.forward_address); __isset = std::move(other35.__isset); return *this; } @@ -688,6 +710,8 @@ void configuration_query_by_index_response::printTo(std::ostream &out) const << "is_stateful=" << to_string(is_stateful); out << ", " << "partitions=" << to_string(partitions); + out << ", " + << "forward_address=" << to_string(forward_address); out << ")"; } diff --git a/src/dist/replication/meta_server/meta_service.cpp b/src/dist/replication/meta_server/meta_service.cpp index 5ce5554066..d1c2994cc1 100644 --- a/src/dist/replication/meta_server/meta_service.cpp +++ b/src/dist/replication/meta_server/meta_service.cpp @@ -321,18 +321,23 @@ void meta_service::register_rpc_handlers() RPC_CM_DDD_DIAGNOSE, "ddd_diagnose", &meta_service::ddd_diagnose); } -int meta_service::check_leader(dsn::message_ex *req) +int meta_service::check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address) { dsn::rpc_address leader; if (!_failure_detector->get_leader(&leader)) { - if (!req->header->context.u.is_forward_supported) + if (!req->header->context.u.is_forward_supported) { + if (forward_address != nullptr) + *forward_address = leader; return -1; + } dinfo("leader address: %s", leader.to_string()); if (!leader.is_invalid()) { dsn_rpc_forward(req, leader); return 0; } else { + if (forward_address != nullptr) + forward_address->set_invalid(); return -1; } } @@ -341,7 +346,24 @@ int meta_service::check_leader(dsn::message_ex *req) #define RPC_CHECK_STATUS(dsn_msg, response_struct) \ dinfo("rpc %s called", __FUNCTION__); \ - int result = check_leader(dsn_msg); \ + int result = check_leader(dsn_msg, nullptr); \ + if (result == 0) \ + return; \ + if (result == -1 || !_started) { \ + if (result == -1) \ + response_struct.err = ERR_FORWARD_TO_OTHERS; \ + else if (_recovering) \ + response_struct.err = ERR_UNDER_RECOVERY; \ + else \ + response_struct.err = ERR_SERVICE_NOT_ACTIVE; \ + ddebug("reject request with %s", response_struct.err.to_string()); \ + reply(dsn_msg, response_struct); \ + return; \ + } + +#define RPC_CHECK_STATUS_WITH_FORWARD(dsn_msg, response_struct) \ + dinfo("rpc %s called", __FUNCTION__); \ + int result = check_leader(dsn_msg, &response_struct.forward_address); \ if (result == 0) \ return; \ if (result == -1 || !_started) { \ @@ -485,7 +507,7 @@ void meta_service::on_query_configuration_by_node(dsn::message_ex *msg) void meta_service::on_query_configuration_by_index(dsn::message_ex *msg) { configuration_query_by_index_response response; - RPC_CHECK_STATUS(msg, response); + RPC_CHECK_STATUS_WITH_FORWARD(msg, response); configuration_query_by_index_request request; dsn::unmarshall(msg, request); @@ -588,7 +610,7 @@ void meta_service::on_start_recovery(dsn::message_ex *req) { configuration_recovery_response response; ddebug("got start recovery request, start to do recovery"); - int result = check_leader(req); + int result = check_leader(req, nullptr); if (result == 0) // request has been forwarded to others { return; diff --git a/src/dist/replication/meta_server/meta_service.h b/src/dist/replication/meta_server/meta_service.h index 72f0487a7d..d0943073c0 100644 --- a/src/dist/replication/meta_server/meta_service.h +++ b/src/dist/replication/meta_server/meta_service.h @@ -172,7 +172,8 @@ class meta_service : public serverlet // 1. the meta is leader // 0. meta isn't leader, and rpc-msg can forward to others // -1. meta isn't leader, and rpc-msg can't forward to others - int check_leader(dsn::message_ex *req); + // if return -1 and `forward_address' != nullptr, then return leader by `forward_address'. + int check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address); error_code remote_storage_initialize(); bool check_freeze() const; diff --git a/src/dsn.layer2.thrift b/src/dsn.layer2.thrift index 5043cf80ae..fd16b8e7d2 100644 --- a/src/dsn.layer2.thrift +++ b/src/dsn.layer2.thrift @@ -29,6 +29,7 @@ struct configuration_query_by_index_response 3:i32 partition_count; 4:bool is_stateful; 5:list partitions; + 6:dsn.rpc_address forward_address; } enum app_status