Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req
Status res = Status::OK();
try {
DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
SchemaChangeJob job(engine, agent_task_req.alter_tablet_req_v2);
SchemaChangeJob job(engine, agent_task_req.alter_tablet_req_v2,
std::to_string(agent_task_req.alter_tablet_req_v2.job_id));
status = job.process_alter_tablet(agent_task_req.alter_tablet_req_v2);
} catch (const Exception& e) {
status = e.to_status();
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
"Don't support to add materialized view by linked schema change");
}

LOG(INFO) << "schema change type, sc_sorting: " << sc_sorting
<< ", sc_directly: " << sc_directly << ", base_tablet=" << _base_tablet->tablet_id()
<< ", new_tablet=" << _new_tablet->tablet_id();

// 2. Generate historical data converter
auto sc_procedure = get_sc_procedure(changer, sc_sorting);

Expand Down
27 changes: 20 additions & 7 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,
assert_cast<vectorized::ColumnNullable*>(new_col->assume_mutable().get());

new_nullable_col->change_nested_column(ref_col);
new_nullable_col->get_null_map_data().resize_fill(new_nullable_col->size());
new_nullable_col->get_null_map_data().resize_fill(ref_col->size());
} else {
// nullable to not nullable:
// suppose column `c_phone` is originally varchar(16) NOT NULL,
Expand Down Expand Up @@ -394,11 +394,22 @@ Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
return Status::DataQualityError("Null data is changed to not nullable");
}
} else {
const auto* new_null_map =
const auto& null_map_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
->get_null_map_column()
.get_data()
.data();
->get_null_map_column();
const auto& nested_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
->get_nested_column();
const auto* new_null_map = null_map_column.get_data().data();

if (null_map_column.size() != new_column->size() ||
nested_column.size() != new_column->size()) {
DCHECK(false);
return Status::InternalError(
"null_map_column size is changed, null_map_column_size={}, "
"new_column_size={}",
null_map_column.size(), new_column->size());
}

bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
Expand Down Expand Up @@ -713,7 +724,7 @@ Status SchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) {
}

SchemaChangeJob::SchemaChangeJob(StorageEngine& local_storage_engine,
const TAlterTabletReqV2& request)
const TAlterTabletReqV2& request, const std::string& job_id)
: _local_storage_engine(local_storage_engine) {
_base_tablet = _local_storage_engine.tablet_manager()->get_tablet(request.base_tablet_id);
_new_tablet = _local_storage_engine.tablet_manager()->get_tablet(request.new_tablet_id);
Expand All @@ -726,6 +737,7 @@ SchemaChangeJob::SchemaChangeJob(StorageEngine& local_storage_engine,
// the complete variant is constructed by reading all the sub-columns of the variant.
_new_tablet_schema = _new_tablet->tablet_schema()->copy_without_extracted_columns();
}
_job_id = job_id;
}

// In the past schema change and rollup will create new tablet and will wait for txns starting before the task to finished
Expand Down Expand Up @@ -1017,7 +1029,7 @@ Status SchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc
int64_t* real_alter_version) {
LOG(INFO) << "begin to convert historical rowsets for new_tablet from base_tablet."
<< " base_tablet=" << _base_tablet->tablet_id()
<< ", new_tablet=" << _new_tablet->tablet_id();
<< ", new_tablet=" << _new_tablet->tablet_id() << ", job_id=" << _job_id;

// find end version
int32_t end_version = -1;
Expand Down Expand Up @@ -1291,6 +1303,7 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params,
// use directly schema change instead.
if (!(*sc_directly) && !(*sc_sorting)) {
// check has remote rowset
// work for cloud and cold storage
for (const auto& rs_reader : sc_params.ref_rowset_readers) {
if (!rs_reader->rowset()->is_local()) {
*sc_directly = true;
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ struct SchemaChangeParams {

class SchemaChangeJob {
public:
SchemaChangeJob(StorageEngine& local_storage_engine, const TAlterTabletReqV2& request);
SchemaChangeJob(StorageEngine& local_storage_engine, const TAlterTabletReqV2& request,
const std::string& job_id);
Status process_alter_tablet(const TAlterTabletReqV2& request);

bool tablet_in_converting(int64_t tablet_id);
Expand Down Expand Up @@ -323,5 +324,6 @@ class SchemaChangeJob {
std::shared_mutex _mutex;
std::unordered_set<int64_t> _tablet_ids_in_converting;
std::set<std::string> _supported_functions;
std::string _job_id;
};
} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
if (indexColumnMap.containsKey(SchemaChangeHandler.SHADOW_NAME_PREFIX + column.getName())) {
Column newColumn = indexColumnMap
.get(SchemaChangeHandler.SHADOW_NAME_PREFIX + column.getName());
if (newColumn.getType() != column.getType()) {
if (!newColumn.getType().equals(column.getType())) {
try {
SlotRef slot = new SlotRef(destSlotDesc);
slot.setCol(column.getName());
Expand Down