Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logging improvements in cluster members subsystem #22677

Merged
merged 6 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions src/v/cluster/members_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ members_manager::apply_update(model::record_batch b) {
[this, update_offset](add_node_cmd cmd) {
vlog(
clusterlog.info,
"processing node add command - broker: {}, offset: {}",
"applying node add command - broker: {}, offset: {}",
cmd.value,
update_offset);
_first_node_operation_command_offset = std::min(
Expand All @@ -436,7 +436,7 @@ members_manager::apply_update(model::record_batch b) {
[this, update_offset](update_node_cfg_cmd cmd) {
vlog(
clusterlog.info,
"processing node update command - broker: {}, offset: {}",
"applying node update command - broker: {}, offset: {}",
cmd.value,
update_offset);
_first_node_operation_command_offset = std::min(
Expand All @@ -446,7 +446,7 @@ members_manager::apply_update(model::record_batch b) {
[this, update_offset](remove_node_cmd cmd) {
vlog(
clusterlog.info,
"processing node delete command - node: {}, offset: {}",
"applying node delete command - node: {}, offset: {}",
cmd.key,
update_offset);
_first_node_operation_command_offset = std::min(
Expand Down Expand Up @@ -905,26 +905,33 @@ ss::future<> members_manager::set_initial_state(
template<typename Cmd>
ss::future<std::error_code> members_manager::dispatch_updates_to_cores(
model::offset update_offset, Cmd cmd) {
return _members_table
.map([cmd, update_offset](members_table& mt) {
auto results = co_await _members_table.map(
[cmd = std::move(cmd), update_offset](members_table& mt) {
return mt.apply(update_offset, cmd);
})
.then([](std::vector<std::error_code> results) {
auto sentinel = results.front();
auto state_consistent = std::all_of(
results.begin(), results.end(), [sentinel](std::error_code res) {
return sentinel == res;
});

vassert(
state_consistent,
"State inconsistency across shards detected, "
"expected result: {}, have: {}",
sentinel,
results);
});

return sentinel;
auto error = results.front();
auto state_consistent = std::all_of(
results.begin(), results.end(), [error](std::error_code res) {
return error == res;
});

vassert(
state_consistent,
"State inconsistency across shards detected, "
"expected result: {}, have: {}",
error,
results);
if (error) {
vlog(
clusterlog.warn,
"error applying command with type {} at offset {} - {}",
Cmd::type,
update_offset,
error.message());
}

co_return error;
}

ss::future<> members_manager::stop() {
Expand Down
37 changes: 23 additions & 14 deletions src/v/cluster/members_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ members_table::get_removed_node_metadata_ref(model::node_id id) const {
}

std::error_code members_table::apply(model::offset o, add_node_cmd cmd) {
vlog(clusterlog.info, "applying node add command for: {}", cmd.value);
_version = model::revision_id(o);
auto it = _nodes.find(cmd.value.id());
if (it != _nodes.end()) {
return errc::invalid_node_operation;
}
vlog(clusterlog.info, "adding node {}", cmd.value);
_nodes.emplace(cmd.value.id(), node_metadata{.broker = cmd.value});

_waiters.notify(cmd.value.id());
Expand All @@ -105,19 +105,24 @@ void members_table::set_initial_brokers(std::vector<model::broker> brokers) {
}

std::error_code members_table::apply(model::offset o, update_node_cfg_cmd cmd) {
vlog(
clusterlog.info,
"applying update node config command for: {}",
cmd.value);
_version = model::revision_id(o);
auto it = _nodes.find(cmd.value.id());
if (it == _nodes.end()) {
return errc::node_does_not_exists;
}
vlog(clusterlog.info, "updating node configuration {}", cmd.value);
it->second.broker = std::move(cmd.value);

notify_member_updated(cmd.value.id(), model::membership_state::active);
return errc::success;
}

std::error_code members_table::apply(model::offset o, remove_node_cmd cmd) {
vlog(
clusterlog.info, "applying remove node config command for: {}", cmd.key);
_version = model::revision_id(o);
auto it = _nodes.find(cmd.key);
if (it == _nodes.end()) {
Expand All @@ -129,7 +134,6 @@ std::error_code members_table::apply(model::offset o, remove_node_cmd cmd) {
return errc::invalid_node_operation;
}

vlog(clusterlog.info, "removing node {}", cmd.key);
auto handle = _nodes.extract(it);

handle.mapped().state.set_membership_state(
Expand All @@ -143,6 +147,8 @@ std::error_code members_table::apply(model::offset o, remove_node_cmd cmd) {
std::error_code
members_table::apply(model::offset version, decommission_node_cmd cmd) {
_version = model::revision_id(version());
vlog(
clusterlog.info, "applying decommission node command for: {}", cmd.key);

if (auto it = _nodes.find(cmd.key); it != _nodes.end()) {
auto& [id, metadata] = *it;
Expand All @@ -151,11 +157,6 @@ members_table::apply(model::offset version, decommission_node_cmd cmd) {
!= model::membership_state::active) {
return errc::invalid_node_operation;
}
vlog(
clusterlog.info,
"changing node {} membership state to: {}",
id,
model::membership_state::draining);
metadata.state.set_membership_state(model::membership_state::draining);
notify_member_updated(cmd.key, model::membership_state::draining);
return errc::success;
Expand All @@ -166,6 +167,8 @@ members_table::apply(model::offset version, decommission_node_cmd cmd) {
std::error_code
members_table::apply(model::offset version, recommission_node_cmd cmd) {
_version = model::revision_id(version());
vlog(
clusterlog.info, "applying recommission node command for: {}", cmd.key);

if (auto it = _nodes.find(cmd.key); it != _nodes.end()) {
auto& [id, metadata] = *it;
Expand All @@ -174,11 +177,6 @@ members_table::apply(model::offset version, recommission_node_cmd cmd) {
!= model::membership_state::draining) {
return errc::invalid_node_operation;
}
vlog(
clusterlog.info,
"changing node {} membership state to: {}",
id,
model::membership_state::active);
metadata.state.set_membership_state(model::membership_state::active);
notify_member_updated(cmd.key, model::membership_state::active);
return errc::success;
Expand All @@ -189,6 +187,12 @@ members_table::apply(model::offset version, recommission_node_cmd cmd) {
std::error_code
members_table::apply(model::offset version, maintenance_mode_cmd cmd) {
_version = model::revision_id(version());
vlog(
clusterlog.info,
"applying maintenance mode command for: {} with enable maintenance "
"mode: {}",
cmd.key,
cmd.value);

const auto target = _nodes.find(cmd.key);
if (target == _nodes.end()) {
Expand All @@ -207,7 +211,6 @@ members_table::apply(model::offset version, maintenance_mode_cmd cmd) {
return errc::success;
}

vlog(clusterlog.info, "marking node {} not in maintenance state", id);
metadata.state.set_maintenance_state(
model::maintenance_state::inactive);
notify_maintenance_state_change(id, model::maintenance_state::inactive);
Expand Down Expand Up @@ -289,6 +292,11 @@ void members_table::apply_snapshot(
std::swap(old_nodes, _nodes);

for (const auto& [id, node] : snap.nodes) {
vlog(
clusterlog.trace,
"adding node {} with state {} from snapshot",
node.broker,
node.state);
_nodes.emplace(id, node_metadata{node.broker, node.state});
_waiters.notify(id);
notify_member_updated(id, node.state.get_membership_state());
Expand All @@ -297,6 +305,7 @@ void members_table::apply_snapshot(
_removed_nodes.clear();

for (const auto& [id, node] : snap.removed_nodes) {
vlog(clusterlog.trace, "removed node {} from snapshot", node.broker);
_removed_nodes.emplace(id, node_metadata{node.broker, node.state});
notify_member_updated(id, model::membership_state::removed);
}
Expand Down
20 changes: 14 additions & 6 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -3005,7 +3005,8 @@ def start_node(self,
expect_fail: bool = False,
auto_assign_node_id: bool = False,
omit_seeds_on_idx_one: bool = True,
skip_readiness_check: bool = False):
skip_readiness_check: bool = False,
node_id_override: int | None = None):
"""
Start a single instance of redpanda. This function will not return until
redpanda appears to have started successfully. If redpanda does not
Expand All @@ -3022,7 +3023,8 @@ def start_node(self,
node,
override_cfg_params,
auto_assign_node_id=auto_assign_node_id,
omit_seeds_on_idx_one=omit_seeds_on_idx_one)
omit_seeds_on_idx_one=omit_seeds_on_idx_one,
node_id_override=node_id_override)

if timeout is None:
timeout = self.node_ready_timeout_s
Expand Down Expand Up @@ -3936,7 +3938,8 @@ def write_node_conf_file(self,
node,
override_cfg_params=None,
auto_assign_node_id=False,
omit_seeds_on_idx_one=True):
omit_seeds_on_idx_one=True,
node_id_override: int | None = None):
"""
Write the node config file for a redpanda node: this is the YAML representation
of Redpanda's `node_config` class. Distinct from Redpanda's _cluster_ configuration
Expand All @@ -3945,7 +3948,11 @@ def write_node_conf_file(self,
node_info = {self.idx(n): n for n in self.nodes}

include_seed_servers = True
node_id = self.idx(node)
if node_id_override:
assert auto_assign_node_id == False, "Can not use node id override when auto assigning node ids"
node_id = node_id_override
else:
node_id = self.idx(node)
if omit_seeds_on_idx_one and node_id == 1:
include_seed_servers = False

Expand Down Expand Up @@ -4579,7 +4586,8 @@ def search_log_all(self,
def wait_for_controller_snapshot(self,
node,
prev_mtime=0,
prev_start_offset=0):
prev_start_offset=0,
timeout_sec=30):
def check():
snap_path = os.path.join(self.DATA_DIR,
'redpanda/controller/0_0/snapshot')
Expand All @@ -4599,7 +4607,7 @@ def check():
so = controller_status['start_offset']
return (mtime > prev_mtime and so > prev_start_offset, (mtime, so))

return wait_until_result(check, timeout_sec=30, backoff_sec=1)
return wait_until_result(check, timeout_sec=timeout_sec, backoff_sec=1)

def _get_object_storage_report(self,
tolerate_empty_object_storage=False,
Expand Down
Loading