Skip to content

Commit

Permalink
refactor(FQDN): feather refator on idl/dsn.layer2.thrift
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Jul 16, 2024
1 parent b72274f commit e232ba6
Show file tree
Hide file tree
Showing 67 changed files with 1,983 additions and 1,553 deletions.
12 changes: 6 additions & 6 deletions idl/dsn.layer2.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ struct partition_configuration
1:dsn.gpid pid;
2:i64 ballot;
3:i32 max_replica_count;
4:dsn.rpc_address primary;
5:list<dsn.rpc_address> secondaries;
6:list<dsn.rpc_address> last_drops;
4:dsn.rpc_address primary1;
5:list<dsn.rpc_address> secondaries1;
6:list<dsn.rpc_address> last_drops1;
7:i64 last_committed_decree;
8:i32 partition_flags;
9:optional dsn.host_port hp_primary;
10:optional list<dsn.host_port> hp_secondaries;
11:optional list<dsn.host_port> hp_last_drops;
9:optional dsn.host_port hp_primary1;
10:optional list<dsn.host_port> hp_secondaries1;
11:optional list<dsn.host_port> hp_last_drops1;
}

struct query_cfg_request
Expand Down
12 changes: 8 additions & 4 deletions src/client/partition_resolver_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ void partition_resolver_simple::query_config_reply(error_code err,
LOG_DEBUG_PREFIX("query config reply, gpid = {}, ballot = {}, primary = {}",
new_pc.pid,
new_pc.ballot,
FMT_HOST_PORT_AND_IP(new_pc, primary));
FMT_HOST_PORT_AND_IP(new_pc, primary1));

auto it2 = _config_cache.find(new_pc.pid.get_partition_index());
if (it2 == _config_cache.end()) {
Expand Down Expand Up @@ -414,14 +414,18 @@ void partition_resolver_simple::handle_pending_requests(std::deque<request_conte
host_port partition_resolver_simple::get_host_port(const partition_configuration &pc) const
{
if (_app_is_stateful) {
return pc.hp_primary;
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
return primary;
}

if (pc.hp_last_drops.empty()) {
std::vector<host_port> last_drops;
GET_HOST_PORTS(pc, last_drops1, last_drops);
if (last_drops.empty()) {
return host_port();
}

return pc.hp_last_drops[rand::next_u32(0, pc.last_drops.size() - 1)];
return last_drops[rand::next_u32(0, last_drops.size() - 1)];
}

error_code partition_resolver_simple::get_host_port(int partition_index, /*out*/ host_port &hp)
Expand Down
56 changes: 40 additions & 16 deletions src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,19 @@ dsn::error_code replication_ddl_client::wait_app_ready(const std::string &app_na
int ready_count = 0;
for (int i = 0; i < partition_count; i++) {
const auto &pc = query_resp.partitions[i];
if (pc.hp_primary && (pc.hp_secondaries.size() + 1 >= max_replica_count)) {
ready_count++;
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
if (!primary) {
continue;
}

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
if (secondaries.size() + 1 < max_replica_count) {
continue;
}

ready_count++;
}
if (ready_count == partition_count) {
std::cout << app_name << " is ready now: (" << ready_count << "/" << partition_count
Expand Down Expand Up @@ -435,11 +445,16 @@ dsn::error_code replication_ddl_client::list_apps(const dsn::app_status::type st
int read_unhealthy = 0;
for (const auto &pc : pcs) {
int replica_count = 0;
if (pc.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
if (primary) {
replica_count++;
}
replica_count += pc.hp_secondaries.size();
if (pc.hp_primary) {

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
replica_count += secondaries.size();
if (primary) {
if (replica_count >= pc.max_replica_count) {
fully_healthy++;
} else if (replica_count < 2) {
Expand Down Expand Up @@ -573,13 +588,18 @@ dsn::error_code replication_ddl_client::list_nodes(const dsn::replication::node_
}

for (const auto &pc : pcs) {
if (pc.hp_primary) {
auto find = tmp_map.find(pc.hp_primary);
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
if (primary) {
auto find = tmp_map.find(primary);
if (find != tmp_map.end()) {
find->second.primary_count++;
}
}
for (const auto &secondary : pc.hp_secondaries) {

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
for (const auto &secondary : secondaries) {
auto find = tmp_map.find(secondary);
if (find != tmp_map.end()) {
find->second.secondary_count++;
Expand Down Expand Up @@ -766,14 +786,18 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
int read_unhealthy = 0;
for (const auto &pc : pcs) {
int replica_count = 0;
if (pc.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
if (primary) {
replica_count++;
node_stat[pc.hp_primary].first++;
node_stat[primary].first++;
total_prim_count++;
}
replica_count += pc.hp_secondaries.size();
total_sec_count += pc.hp_secondaries.size();
if (pc.hp_primary) {
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
replica_count += secondaries.size();
total_sec_count += secondaries.size();
if (primary) {
if (replica_count >= pc.max_replica_count) {
fully_healthy++;
} else if (replica_count < 2) {
Expand All @@ -783,14 +807,14 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
write_unhealthy++;
read_unhealthy++;
}
for (const auto &secondary : pc.hp_secondaries) {
for (const auto &secondary : secondaries) {
node_stat[secondary].second++;
}
tp_details.add_row(pc.pid.get_partition_index());
tp_details.append_data(pc.ballot);
tp_details.append_data(fmt::format("{}/{}", replica_count, pc.max_replica_count));
tp_details.append_data(pc.hp_primary ? pc.hp_primary.to_string() : "-");
tp_details.append_data(fmt::format("[{}]", fmt::join(pc.hp_secondaries, ",")));
tp_details.append_data(primary ? primary.to_string() : "-");
tp_details.append_data(fmt::format("[{}]", fmt::join(secondaries, ",")));
}
mtp.add(std::move(tp_details));

Expand Down
12 changes: 6 additions & 6 deletions src/common/json_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -717,14 +717,14 @@ NON_MEMBER_JSON_SERIALIZATION(dsn::partition_configuration,
pid,
ballot,
max_replica_count,
primary,
secondaries,
last_drops,
primary1,
secondaries1,
last_drops1,
last_committed_decree,
partition_flags,
hp_primary,
hp_secondaries,
hp_last_drops)
hp_primary1,
hp_secondaries1,
hp_last_drops1)

NON_MEMBER_JSON_SERIALIZATION(dsn::app_info,
status,
Expand Down
10 changes: 7 additions & 3 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,18 @@ int32_t replication_options::app_mutation_2pc_min_replica_count(int32_t app_max_
rc.pid = pc.pid;
rc.ballot = pc.ballot;
rc.learner_signature = invalid_signature;
SET_OBJ_IP_AND_HOST_PORT(rc, primary, pc, primary);
SET_OBJ_IP_AND_HOST_PORT(rc, primary, pc, primary1);

if (node == pc.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
if (node == primary) {
rc.status = partition_status::PS_PRIMARY;
return true;
}

if (utils::contains(pc.hp_secondaries, node)) {
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
if (utils::contains(secondaries, node)) {
rc.status = partition_status::PS_SECONDARY;
return true;
}
Expand Down
25 changes: 17 additions & 8 deletions src/common/replication_other_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "consensus_types.h"
#include "replica_admin_types.h"
#include "common/replication_enums.h"
#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"

Expand All @@ -53,19 +54,19 @@ typedef int64_t decree;

inline bool is_primary(const partition_configuration &pc, const host_port &node)
{
return node && pc.hp_primary == node;
return node && pc.hp_primary1 == node;
}
inline bool is_primary(const partition_configuration &pc, const rpc_address &node)
{
return node && pc.primary == node;
return node && pc.primary1 == node;
}
inline bool is_secondary(const partition_configuration &pc, const host_port &node)
{
return node && utils::contains(pc.hp_secondaries, node);
return node && utils::contains(pc.hp_secondaries1, node);
}
inline bool is_secondary(const partition_configuration &pc, const rpc_address &node)
{
return node && utils::contains(pc.secondaries, node);
return node && utils::contains(pc.secondaries1, node);
}
inline bool is_member(const partition_configuration &pc, const host_port &node)
{
Expand All @@ -79,16 +80,24 @@ inline bool is_partition_config_equal(const partition_configuration &pc1,
const partition_configuration &pc2)
{
// secondaries no need to be same order
for (const auto &pc1_secondary : pc1.hp_secondaries) {
std::vector<host_port> pc1_secondaries;
GET_HOST_PORTS(pc1, secondaries1, pc1_secondaries);
for (const auto &pc1_secondary : pc1_secondaries) {
if (!is_secondary(pc2, pc1_secondary)) {
return false;
}
}

// last_drops is not considered into equality check
host_port pc1_primary;
GET_HOST_PORT(pc1, primary1, pc1_primary);
host_port pc2_primary;
GET_HOST_PORT(pc2, primary1, pc2_primary);
std::vector<host_port> pc2_secondaries;
GET_HOST_PORTS(pc2, secondaries1, pc2_secondaries);
return pc1.ballot == pc2.ballot && pc1.pid == pc2.pid &&
pc1.max_replica_count == pc2.max_replica_count && pc1.primary == pc2.primary &&
pc1.hp_primary == pc2.hp_primary && pc1.secondaries.size() == pc2.secondaries.size() &&
pc1.hp_secondaries.size() == pc2.hp_secondaries.size() &&
pc1.max_replica_count == pc2.max_replica_count && pc1_primary == pc2_primary &&
pc1_secondaries.size() == pc2_secondaries.size() &&
pc1.last_committed_decree == pc2.last_committed_decree;
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/backup_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ void backup_engine::backup_app_partition(const gpid &pid)
_is_backup_failed = true;
return;
}
partition_primary = app->pcs[pid.get_partition_index()].hp_primary;
partition_primary = app->pcs[pid.get_partition_index()].hp_primary1;
}

if (!partition_primary) {
Expand Down
13 changes: 9 additions & 4 deletions src/meta/cluster_balance_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,17 @@ bool cluster_balance_policy::get_app_migration_info(std::shared_ptr<app_state> a
info.partitions.reserve(app->pcs.size());
for (const auto &pc : app->pcs) {
std::map<host_port, partition_status::type> pstatus_map;
pstatus_map[pc.hp_primary] = partition_status::PS_PRIMARY;
if (pc.hp_secondaries.size() != pc.max_replica_count - 1) {
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
pstatus_map[primary] = partition_status::PS_PRIMARY;

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
if (secondaries.size() != pc.max_replica_count - 1) {
// partition is unhealthy
return false;
}
for (const auto &secondary : pc.hp_secondaries) {
for (const auto &secondary : secondaries) {
pstatus_map[secondary] = partition_status::PS_SECONDARY;
}
info.partitions.push_back(std::move(pstatus_map));
Expand Down Expand Up @@ -548,7 +553,7 @@ bool cluster_balance_policy::apply_move(const move_info &move,
// add into the migration list and selected_pid
partition_configuration pc;
pc.pid = move.pid;
SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, primary_hp);
SET_IP_AND_HOST_PORT_BY_DNS(pc, primary1, primary_hp);
list[move.pid] = generate_balancer_request(*_global_view->apps, pc, move.type, source, target);
_migration_result->emplace(
move.pid, generate_balancer_request(*_global_view->apps, pc, move.type, source, target));
Expand Down
10 changes: 5 additions & 5 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,8 @@ void meta_duplication_service::check_follower_app_if_create_completed(
const host_port secondary2("localhost", 34803);

partition_configuration pc;
SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, primary);
SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, secondary1, secondary2);
SET_IP_AND_HOST_PORT_BY_DNS(pc, primary1, primary);
SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries1, secondary1, secondary2);
resp.partitions.emplace_back(pc);
}
});
Expand All @@ -562,17 +562,17 @@ void meta_duplication_service::check_follower_app_if_create_completed(
query_err = ERR_INCONSISTENT_STATE;
} else {
for (const auto &pc : resp.partitions) {
if (!pc.hp_primary) {
if (!pc.hp_primary1) {
query_err = ERR_INACTIVE_STATE;
break;
}

if (pc.hp_secondaries.empty()) {
if (pc.hp_secondaries1.empty()) {
query_err = ERR_NOT_ENOUGH_MEMBER;
break;
}

for (const auto &secondary : pc.hp_secondaries) {
for (const auto &secondary : pc.hp_secondaries1) {
if (!secondary) {
query_err = ERR_INACTIVE_STATE;
break;
Expand Down
6 changes: 3 additions & 3 deletions src/meta/load_balance_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ generate_balancer_request(const app_mapper &apps,
ans = "copy_secondary";
result.balance_type = balancer_request_type::copy_secondary;
result.action_list.emplace_back(
new_proposal_action(pc.hp_primary, to, config_type::CT_ADD_SECONDARY_FOR_LB));
new_proposal_action(pc.hp_primary1, to, config_type::CT_ADD_SECONDARY_FOR_LB));
result.action_list.emplace_back(
new_proposal_action(pc.hp_primary, from, config_type::CT_REMOVE));
new_proposal_action(pc.hp_primary1, from, config_type::CT_REMOVE));
break;
default:
CHECK(false, "");
Expand Down Expand Up @@ -567,7 +567,7 @@ void ford_fulkerson::update_decree(int node_id, const node_state &ns)
{
ns.for_each_primary(_app->app_id, [&, this](const gpid &pid) {
const auto &pc = _app->pcs[pid.get_partition_index()];
for (const auto &secondary : pc.hp_secondaries) {
for (const auto &secondary : pc.hp_secondaries1) {
auto i = _host_port_id.find(secondary);
CHECK(i != _host_port_id.end(), "invalid secondary: {}", secondary);
_network[node_id][i->second]++;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/meta_backup_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ void policy_context::start_backup_partition_unlocked(gpid pid)
pid, cold_backup_constant::PROGRESS_FINISHED, dsn::host_port());
return;
}
partition_primary = app->pcs[pid.get_partition_index()].hp_primary;
partition_primary = app->pcs[pid.get_partition_index()].hp_primary1;
}
if (!partition_primary) {
LOG_WARNING("{}: partition {} doesn't have a primary now, retry to backup it later",
Expand Down
8 changes: 6 additions & 2 deletions src/meta/meta_bulk_load_ingestion_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ void ingestion_context::partition_node_info::create(const partition_configuratio
{
pid = pc.pid;
std::unordered_set<host_port> current_nodes;
current_nodes.insert(pc.hp_primary);
for (const auto &secondary : pc.hp_secondaries) {
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
current_nodes.insert(primary);
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
for (const auto &secondary : secondaries) {
current_nodes.insert(secondary);
}
for (const auto &node : current_nodes) {
Expand Down
Loading

0 comments on commit e232ba6

Please sign in to comment.