Skip to content

Commit

Permalink
[fix](cloud-schema-change) Write schema change jobs to a deserialized…
Browse files Browse the repository at this point in the history
… pb buffer rather than a new one (#38210)

## Proposed changes

Currently, when starting a schema change job, job info will be persists
in an new created protobuf struct rather than the original stored one,
which will lead to compation job losing and result in `failed to lease
compaction job`.

Therefore, write the newly created schema job info into the deserialized
proto buffer of the original job info to fix the job info lose.
  • Loading branch information
TangSiyang2001 authored Jul 26, 2024
1 parent 5f40c79 commit 5fc67be
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 0 deletions.
5 changes: 5 additions & 0 deletions cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ void start_schema_change_job(MetaServiceCode& code, std::string& msg, std::strin
code = cast_as<ErrCategory::READ>(err);
return;
}
if (!job_pb.ParseFromString(job_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "pb deserialization failed";
return;
}
job_pb.mutable_idx()->CopyFrom(request->job().idx());
// FE can ensure that a tablet does not have more than one schema_change job at the same time,
// so we can directly preempt previous schema_change job.
Expand Down
71 changes: 71 additions & 0 deletions cloud/test/meta_service_job_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@

#include <cstdint>
#include <cstdlib>
#include <ctime>
#include <functional>
#include <limits>
#include <memory>
#include <random>
#include <string>

#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
#include "meta-service/meta_service.h"
#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"

namespace doris::cloud {
Expand Down Expand Up @@ -2223,4 +2228,70 @@ TEST(MetaServiceJobTest, ParallelCumuCompactionTest) {
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

TEST(MetaServiceJobTest, SchemaChangeJobPersistTest) {
auto meta_service = get_meta_service();

auto* sp = SyncPoint::get_instance();
std::unique_ptr<int, std::function<void(int*)>> defer(
(int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); });
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
ret->second = true;
});
sp->enable_processing();

brpc::Controller cntl;

int64_t table_id = 1;
int64_t index_id = 2;
int64_t partition_id = 3;
int64_t tablet_id = 4;

ASSERT_NO_FATAL_FAILURE(
create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id, false));

StartTabletJobResponse res;
start_compaction_job(meta_service.get(), tablet_id, "job1", "BE1", 0, 0,
TabletCompactionJobPB::CUMULATIVE, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
res.Clear();

std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string job_key =
job_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id});
std::string job_val;
TabletJobInfoPB job_pb;
ASSERT_EQ(txn->get(job_key, &job_val), TxnErrorCode::TXN_OK);
ASSERT_TRUE(job_pb.ParseFromString(job_val));
ASSERT_EQ(job_pb.compaction_size(), 1);
ASSERT_EQ(job_pb.compaction(0).id(), "job1");
ASSERT_EQ(job_pb.compaction(0).initiator(), "BE1");

int64_t new_tablet_id = 11;
ASSERT_NO_FATAL_FAILURE(create_tablet(meta_service.get(), table_id, index_id, partition_id,
new_tablet_id, false, true));
ASSERT_NO_FATAL_FAILURE(start_schema_change_job(meta_service.get(), table_id, index_id,
partition_id, tablet_id, new_tablet_id, "job2",
"BE1"));

long now = time(nullptr);
FinishTabletJobRequest req;
FinishTabletJobResponse finish_res_2;
req.set_action(FinishTabletJobRequest::LEASE);
auto* compaction = req.mutable_job()->add_compaction();
compaction->set_id("job1");
compaction->set_initiator("BE1");
compaction->set_lease(now + 10);
req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
meta_service->finish_tablet_job(&cntl, &req, &finish_res_2, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);

FinishTabletJobResponse finish_res;
finish_schema_change_job(meta_service.get(), tablet_id, new_tablet_id, "job1", "BE1", {},
finish_res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

} // namespace doris::cloud

0 comments on commit 5fc67be

Please sign in to comment.