Skip to content

Commit

Permalink
c/shard_placement_table: make placement_state::assigned private
Browse files Browse the repository at this point in the history
No functional changes
  • Loading branch information
ztlpn committed Jul 29, 2024
1 parent 338e9bd commit 6ae40ce
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 40 deletions.
68 changes: 37 additions & 31 deletions src/v/cluster/shard_placement_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ shard_placement_table::reconciliation_action
shard_placement_table::placement_state::get_reconciliation_action(
std::optional<model::revision_id> expected_log_revision) const {
if (!expected_log_revision) {
if (assigned) {
if (_assigned) {
return reconciliation_action::wait_for_target_update;
}
return reconciliation_action::remove_partition;
Expand All @@ -81,8 +81,8 @@ shard_placement_table::placement_state::get_reconciliation_action(
return reconciliation_action::wait_for_target_update;
}
}
if (assigned) {
if (assigned->log_revision != expected_log_revision) {
if (_assigned) {
if (_assigned->log_revision != expected_log_revision) {
return reconciliation_action::wait_for_target_update;
}
if (_next) {
Expand All @@ -94,13 +94,18 @@ shard_placement_table::placement_state::get_reconciliation_action(
}
}

void shard_placement_table::placement_state::set_assigned(
std::optional<shard_local_assignment> new_assigned) {
_assigned = std::move(new_assigned);
}

std::ostream&
operator<<(std::ostream& o, const shard_placement_table::placement_state& ps) {
fmt::print(
o,
"{{current: {}, assigned: {}, is_initial_for: {}, next: ",
ps.current,
ps.assigned,
ps._assigned,
ps._is_initial_for);
if (ps._next) {
fmt::print(
Expand Down Expand Up @@ -233,14 +238,14 @@ ss::future<> shard_placement_table::persist_shard_local_state() {
_states, 512, [this](const decltype(_states)::value_type& kv) {
const auto& [ntp, pstate] = kv;
auto f1 = ss::now();
if (pstate.assigned) {
if (pstate.assigned()) {
auto marker = assignment_marker{
.log_revision = pstate.assigned->log_revision,
.shard_revision = pstate.assigned->shard_revision,
.log_revision = pstate.assigned()->log_revision,
.shard_revision = pstate.assigned()->shard_revision,
};
f1 = _kvstore.put(
kvstore_key_space,
assignment_kvstore_key(pstate.assigned->group),
assignment_kvstore_key(pstate.assigned()->group),
serde::to_iobuf(marker));
}

Expand Down Expand Up @@ -361,10 +366,10 @@ shard_placement_table::initialize_from_kvstore(
const auto& [ntp, state] = kv;
auto& init_data = ntp2init_data.try_emplace(ntp).first->second;

if (state.assigned) {
init_data.update(s, state.assigned.value());
if (state.assigned()) {
init_data.update(s, state.assigned().value());
max_shard_revision = std::max(
max_shard_revision, state.assigned->shard_revision);
max_shard_revision, state.assigned()->shard_revision);
}

if (state.current) {
Expand Down Expand Up @@ -452,10 +457,10 @@ shard_placement_table::gather_init_states(
marker.log_revision,
marker.shard_revision);

_states[ntp_it->second].assigned = shard_local_assignment{
_states[ntp_it->second].set_assigned(shard_local_assignment{
.group = group,
.log_revision = marker.log_revision,
.shard_revision = marker.shard_revision};
.shard_revision = marker.shard_revision});
}
break;
}
Expand Down Expand Up @@ -535,19 +540,19 @@ ss::future<> shard_placement_table::scatter_init_data(
}

ss::future<> fut = ss::now();
if (state.assigned) {
if (state.assigned()) {
if (_shard != init_data.assigned.shard) {
fut = _kvstore.remove(
kvstore_key_space,
assignment_kvstore_key(state.assigned->group));
state.assigned = std::nullopt;
assignment_kvstore_key(state.assigned()->group));
state.set_assigned(std::nullopt);
} else if (!init_data.hosted.shard) {
state._is_initial_for = init_data.log_revision;
}

if (_shard >= ss::smp::count) {
// mark states on extra shards as ready to transfer
state.assigned = std::nullopt;
state.set_assigned(std::nullopt);
}
}

Expand Down Expand Up @@ -659,14 +664,14 @@ ss::future<> shard_placement_table::do_initialize_from_topic_table(
assigned, hosted_status::hosted);
_states.emplace(ntp, placement);
} else if (ss::this_shard_id() == target->shard) {
placement.assigned = assigned;
placement.set_assigned(assigned);
_states.emplace(ntp, placement);
}
} else if (ss::this_shard_id() == target->shard) {
// in other cases target shard gets the hosted marker
placement.current = shard_local_state(
assigned, hosted_status::hosted);
placement.assigned = assigned;
placement.set_assigned(assigned);
_states.emplace(ntp, placement);
}
});
Expand Down Expand Up @@ -784,10 +789,10 @@ ss::future<> shard_placement_table::set_target(
"is_initial: {}",
ntp,
as,
state.assigned,
state.assigned(),
is_initial);

state.assigned = as;
state.set_assigned(as);
if (is_initial) {
state._is_initial_for = as.log_revision;
}
Expand All @@ -804,17 +809,17 @@ ss::future<> shard_placement_table::set_target(
prev_target->shard,
[&ntp, shard_callback](shard_placement_table& other) {
auto it = other._states.find(ntp);
if (it == other._states.end() || !it->second.assigned) {
if (it == other._states.end() || !it->second.assigned()) {
return;
}

vlog(
clusterlog.trace,
"[{}] removing assigned on this shard (was: {})",
ntp,
it->second.assigned);
it->second.assigned());

it->second.assigned = std::nullopt;
it->second.set_assigned(std::nullopt);
if (it->second.is_empty()) {
// We are on a shard that was previously a target, but didn't
// get to starting the transfer.
Expand Down Expand Up @@ -900,7 +905,7 @@ ss::future<std::error_code> shard_placement_table::prepare_create(
}
auto& state = state_it->second;

if (state.assigned->log_revision != expected_log_rev) {
if (state.assigned()->log_revision != expected_log_rev) {
// assignments got updated while we were waiting for the lock
co_return errc::waiting_for_shard_placement_update;
}
Expand All @@ -911,7 +916,7 @@ ss::future<std::error_code> shard_placement_table::prepare_create(
}

// copy assigned as it may change while we are updating kvstore
auto assigned = *state.assigned;
auto assigned = *state.assigned();

if (!state.current) {
if (state._is_initial_for == expected_log_rev) {
Expand Down Expand Up @@ -1015,7 +1020,7 @@ shard_placement_table::prepare_transfer(
ret.destination = state._next->shard;
// TODO: check that _next is still waiting for our transfer
} else {
if (state.assigned) {
if (state.assigned()) {
ret.source_error = errc::waiting_for_shard_placement_update;
co_return ret;
}
Expand Down Expand Up @@ -1049,8 +1054,9 @@ shard_placement_table::prepare_transfer(
shard_placement_table& dest) {
auto dest_it = dest._states.find(ntp);
if (
dest_it == dest._states.end() || !dest_it->second.assigned
|| dest_it->second.assigned->log_revision != expected_log_rev) {
dest_it == dest._states.end() || !dest_it->second.assigned()
|| dest_it->second.assigned()->log_revision
!= expected_log_rev) {
// We are in the middle of shard_placement_table update, and
// the destination shard doesn't yet know that it is the
// destination. Wait for the update to finish.
Expand All @@ -1072,9 +1078,9 @@ shard_placement_table::prepare_transfer(

// at this point we commit to the transfer on the
// destination shard
shard_rev = dest_state.assigned.value().shard_revision;
shard_rev = dest_state.assigned().value().shard_revision;
dest_state.current = shard_local_state(
dest_state.assigned.value(), hosted_status::receiving);
dest_state.assigned().value(), hosted_status::receiving);

if (dest._persistence_enabled) {
auto marker_buf = serde::to_iobuf(current_state_marker{
Expand Down
12 changes: 9 additions & 3 deletions src/v/cluster/shard_placement_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,25 +117,31 @@ class shard_placement_table

placement_state() = default;

const std::optional<shard_local_assignment>& assigned() const {
return _assigned;
}

/// Current shard-local state for this ntp. Will be non-null if
/// some kvstore state for this ntp exists on this shard.
std::optional<shard_local_state> current;
/// If non-null, the ntp is expected to exist on this shard.
std::optional<shard_local_assignment> assigned;

private:
friend class shard_placement_table;

/// If placement_state is in the _states map, then is_empty() is false.
bool is_empty() const {
return !current && !_is_initial_for && !assigned;
return !current && !_is_initial_for && !_assigned;
}

void set_assigned(std::optional<shard_local_assignment>);

struct versioned_shard {
ss::shard_id shard;
model::shard_revision_id revision;
};

/// If non-null, the ntp is expected to exist on this shard.
std::optional<shard_local_assignment> _assigned;
/// If this shard is the initial shard for some incarnation of this
/// partition on this node, this field will contain the corresponding
/// log revision. Invariant: if both _is_initial_for and current
Expand Down
12 changes: 6 additions & 6 deletions src/v/cluster/tests/shard_placement_table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -895,15 +895,15 @@ class shard_placement_test_fixture : public seastar_test {
placement.current->status,
shard_placement_table::hosted_status::hosted)
<< "ntp: " << ntp << ", shard: " << s;
ASSERT_TRUE_CORO(placement.assigned)
ASSERT_TRUE_CORO(placement.assigned())
<< "ntp: " << ntp << ", shard: " << s;
ASSERT_EQ_CORO(
placement.assigned->log_revision, meta.log_revision)
placement.assigned()->log_revision, meta.log_revision)
<< "ntp: " << ntp << ", shard: " << s;
} else {
ASSERT_TRUE_CORO(!placement.current)
<< "ntp: " << ntp << ", shard: " << s;
ASSERT_TRUE_CORO(!placement.assigned)
ASSERT_TRUE_CORO(!placement.assigned())
<< "ntp: " << ntp << ", shard: " << s;
}
}
Expand Down Expand Up @@ -958,14 +958,14 @@ class shard_placement_test_fixture : public seastar_test {
}
for (const auto& [s, placement] : shard2state) {
if (expected.target && s == expected.target->shard) {
ASSERT_TRUE_CORO(placement.assigned)
ASSERT_TRUE_CORO(placement.assigned())
<< "ntp: " << ntp << ", shard: " << s;
ASSERT_EQ_CORO(
placement.assigned->log_revision,
placement.assigned()->log_revision,
expected.target->log_revision)
<< "ntp: " << ntp << ", shard: " << s;
} else {
ASSERT_TRUE_CORO(!placement.assigned)
ASSERT_TRUE_CORO(!placement.assigned())
<< "ntp: " << ntp << ", shard: " << s;
}
}
Expand Down

0 comments on commit 6ae40ce

Please sign in to comment.