Skip to content

Commit

Permalink
[BACKPORT 2.18][#17972] CDCSDK: Add guards around updating invalid ch…
Browse files Browse the repository at this point in the history
…eckpoints during snapshot phase

Summary:
Original commit: 54ecf5d / D26526
To support resuming of snapshot on connectors using a stream created in explicit mode, we will add snapshot_op_id and snapshot_time to each snapshot record.

Additionally, when we initiate the snapshot process for colocated table, we need to copy over the checkpoint details from the first table's snapshot row to the colocated streaming row. To
ensure we are doing this only for the first table, we were looking at the "snapshot_time" field of the streaming row , now instead we check to see if the checkpoint of the streaming row in 0.0.
This is needed as a consequence of change: https://phorge.dev.yugabyte.com/D26526, where we stopped updating the checkpoint of the streaming row when a snapshot is done.
Jira: DB-7038

Test Plan: ybd --cxx-test integration-tests_cdcsdk_snapshot-test --gtest_filter CDCSDKYsqlTest.TestCheckpointUpdatedDuringSnapshot

Reviewers: vkushwaha, aagarwal

Reviewed By: vkushwaha

Subscribers: ycdcxcluster

Differential Revision: https://phorge.dev.yugabyte.com/D26664
  • Loading branch information
Adithya Bharadwaj committed Jul 6, 2023
1 parent aa98b17 commit 0c56693
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 33 deletions.
16 changes: 9 additions & 7 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1796,7 +1796,8 @@ void CDCServiceImpl::GetChanges(
bool snapshot_bootstrap = IsCDCSDKSnapshotBootstrapRequest(cdc_sdk_from_op_id);
if (record.GetCheckpointType() == IMPLICIT ||
(record.GetCheckpointType() == EXPLICIT &&
(got_explicit_checkpoint_from_request || snapshot_bootstrap))) {
((got_explicit_checkpoint_from_request && explicit_op_id != OpId::Invalid()) ||
snapshot_bootstrap))) {
bool is_snapshot = false;
bool is_colocated = tablet_peer->tablet_metadata()->colocated();
OpId snapshot_op_id = OpId::Invalid();
Expand Down Expand Up @@ -3910,9 +3911,13 @@ Result<CDCSDKCheckpointPB> CDCServiceImpl::GetLastCDCSDKCheckpoint(

const auto& row = *row_opt;

DCHECK_EQ(row.column(checkpoint_idx).type(), InternalType::kStringValue);

auto cdc_sdk_op_id = VERIFY_RESULT(OpId::FromString(row.column(checkpoint_idx).string_value()));
OpId cdc_sdk_op_id;
if (row.column(checkpoint_idx).IsNull()) {
cdc_sdk_op_id = OpId::Invalid();
} else {
DCHECK_EQ(row.column(checkpoint_idx).type(), InternalType::kStringValue);
cdc_sdk_op_id = VERIFY_RESULT(OpId::FromString(row.column(checkpoint_idx).string_value()));
}

if (row.column(last_replicated_column_idx).IsNull() &&
request_source == CDCRequestSource::CDCSDK) {
Expand Down Expand Up @@ -4240,9 +4245,6 @@ Status CDCServiceImpl::UpdateSnapshotDone(
map_value_pb, kCDCSDKSafeTime,
AsString(!cdc_sdk_checkpoint.has_snapshot_time() ? 0 : cdc_sdk_checkpoint.snapshot_time()));
client::AddMapEntryToColumn(map_value_pb, kCDCSDKActiveTime, AsString(current_time));
cdc_state->AddStringColumnValue(
req, master::kCdcCheckpoint,
OpId(cdc_sdk_checkpoint.term(), cdc_sdk_checkpoint.index()).ToString());
cdc_state->AddTimestampColumnValue(req, master::kCdcLastReplicationTime, 0);

// Also update the active_time in the streaming row.
Expand Down
12 changes: 5 additions & 7 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6567,7 +6567,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCheckpointUpdatedDuringSnapsh

// Call GetChanges after snapshot done. We should no loner see snapshot key and snasphot save_time
// in cdc_state table.
change_resp_updated = ASSERT_RESULT(UpdateSnapshotDone(stream_id, tablets, &change_resp));
change_resp_updated = ASSERT_RESULT(UpdateSnapshotDone(stream_id, tablets));

// We should no longer be able to get the snapshot key and safe_time from 'cdc_state' table.
ASSERT_NOK(
Expand Down Expand Up @@ -7575,8 +7575,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestGetCheckpointOnStreamedColoca
ASSERT_OK(conn.ExecuteFormat("INSERT INTO test1 VALUES ($0, $1, $2)", i, i + 1, i + 2));
}

auto snapshot_done_resp =
ASSERT_RESULT(UpdateSnapshotDone(stream_id, tablets, &change_resp, req_table_id));
auto snapshot_done_resp = ASSERT_RESULT(UpdateSnapshotDone(stream_id, tablets, req_table_id));
auto checkpoint_resp =
ASSERT_RESULT(GetCDCSnapshotCheckpoint(stream_id, tablets[0].tablet_id(), req_table_id));
ASSERT_TRUE(!checkpoint_resp.has_snapshot_key() || checkpoint_resp.snapshot_key().empty());
Expand Down Expand Up @@ -7630,7 +7629,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestGetCheckpointOnAddedColocated
break;
}
}
ASSERT_RESULT(UpdateSnapshotDone(stream_id, tablets, &change_resp, req_table_id));
ASSERT_RESULT(UpdateSnapshotDone(stream_id, tablets, req_table_id));
LOG(INFO) << "Streamed snapshot records for table: test1";

for (int i = snapshot_recrods_per_table; i < 2 * snapshot_recrods_per_table; ++i) {
Expand Down Expand Up @@ -7699,8 +7698,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestGetCheckpointOnAddedColocated
}
ASSERT_EQ(seen_snapshot_records, snapshot_recrods_per_table);

added_table_change_resp = ASSERT_RESULT(
UpdateSnapshotDone(stream_id, tablets, &added_table_change_resp, added_table_id));
added_table_change_resp = ASSERT_RESULT(UpdateSnapshotDone(stream_id, tablets, added_table_id));
added_table_checkpoint_resp =
ASSERT_RESULT(GetCDCSnapshotCheckpoint(stream_id, tablets[0].tablet_id(), added_table_id));

Expand Down Expand Up @@ -7752,7 +7750,7 @@ TEST_F(
break;
}
}
ASSERT_RESULT(UpdateSnapshotDone(stream_id, tablets, &change_resp, req_table_id));
ASSERT_RESULT(UpdateSnapshotDone(stream_id, tablets, req_table_id));
LOG(INFO) << "Streamed snapshot records for table: test1";

for (int i = snapshot_recrods_per_table; i < 2 * snapshot_recrods_per_table; ++i) {
Expand Down
19 changes: 0 additions & 19 deletions src/yb/integration-tests/cdcsdk_ysql_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -889,25 +889,6 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {
return tablets;
}

Result<GetChangesResponsePB> UpdateSnapshotDone(
const CDCStreamId& stream_id,
const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& tablets,
const GetChangesResponsePB* change_resp,
const TableId table_id = "") {
GetChangesRequestPB change_req2;
GetChangesResponsePB change_resp2;
PrepareChangeRequest(
&change_req2, stream_id, tablets, 0, change_resp->cdc_sdk_checkpoint().index(),
change_resp->cdc_sdk_checkpoint().term(), kCDCSDKSnapshotDoneKey, 0, 0, table_id);
RpcController get_changes_rpc;
RETURN_NOT_OK(cdc_proxy_->GetChanges(change_req2, &change_resp2, &get_changes_rpc));
if (change_resp2.has_error()) {
return StatusFromPB(change_resp2.error().status());
}

return change_resp2;
}

Result<GetChangesResponsePB> UpdateSnapshotDone(
const CDCStreamId& stream_id,
const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& tablets,
Expand Down

0 comments on commit 0c56693

Please sign in to comment.