diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index 1ff21d3e8466a2..76a0a18d8020ad 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -100,16 +100,20 @@ Status CloudBaseCompaction::prepare_compact() { // tablet not found cloud_tablet()->clear_cache(); } else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION_FAIL) { - (dynamic_cast(_tablet.get()))->set_alter_version(resp.alter_version()); + auto* cloud_tablet = (static_cast(_tablet.get())); std::stringstream ss; ss << "failed to prepare cumu compaction. Check compaction input versions " - "failed in schema change. " + "failed in schema change. The input version end must " + "less than or equal to alter_version." + "current alter version in BE is not correct." "input_version_start=" << compaction_job->input_versions(0) << " input_version_end=" << compaction_job->input_versions(1) + << " current alter_version=" << cloud_tablet->alter_version() << " schema_change_alter_version=" << resp.alter_version(); std::string msg = ss.str(); LOG(WARNING) << msg; + cloud_tablet->set_alter_version(resp.alter_version()); return Status::InternalError(msg); } return st; @@ -329,16 +333,20 @@ Status CloudBaseCompaction::modify_rowsets() { if (resp.status().code() == cloud::TABLET_NOT_FOUND) { cloud_tablet()->clear_cache(); } else if (resp.status().code() == cloud::JOB_CHECK_ALTER_VERSION_FAIL) { - (dynamic_cast(_tablet.get()))->set_alter_version(resp.alter_version()); + auto* cloud_tablet = (static_cast(_tablet.get())); std::stringstream ss; ss << "failed to prepare cumu compaction. Check compaction input versions " - "failed in schema change. " + "failed in schema change. The input version end must " + "less than or equal to alter_version." + "current alter version in BE is not correct." "input_version_start=" << compaction_job->input_versions(0) << " input_version_end=" << compaction_job->input_versions(1) + << " current alter_version=" << cloud_tablet->alter_version() << " schema_change_alter_version=" << resp.alter_version(); std::string msg = ss.str(); LOG(WARNING) << msg; + cloud_tablet->set_alter_version(resp.alter_version()); return Status::InternalError(msg); } return st; diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 96e06057e2bd85..0ca8a504a887f1 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -114,8 +114,8 @@ Status CloudCumulativeCompaction::prepare_compact() { compaction_job->add_input_versions(_input_rowsets.front()->start_version()); compaction_job->add_input_versions(_input_rowsets.back()->end_version()); - // Set input version range to let meta-service judge version range conflict - compaction_job->set_judge_input_versions_range(config::enable_parallel_cumu_compaction); + // Set input version range to let meta-service check version range conflict + compaction_job->set_check_input_versions_range(config::enable_parallel_cumu_compaction); cloud::StartTabletJobResponse resp; st = _engine.meta_mgr().prepare_tablet_job(job, &resp); if (!st.ok()) { diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index 22f6689ff23782..b8acdb6bd38b9e 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -154,20 +154,22 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { }; if (_version != max_version + 1 || should_sync_rowsets_produced_by_compaction()) { auto sync_st = tablet->sync_rowsets(); - if (sync_st.is()) [[unlikely]] { - _engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id); - LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " - "tablet_id: " - << _tablet_id << " txn_id: " << _transaction_id - << ", request_version=" << _version; - return sync_st; - } if (!sync_st.ok()) { LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id << ", status=" << sync_st; _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, sync_st); return sync_st; } + if (tablet->tablet_state() != TABLET_RUNNING) [[unlikely]] { + _engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id); + LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " + "tablet_id: " + << _tablet_id << " txn_id: " << _transaction_id + << ", request_version=" << _version; + return Status::Error( + "invalid tablet state {}. tablet_id={}", tablet->tablet_state(), + tablet->tablet_id()); + } } auto sync_rowset_time_us = MonotonicMicros() - t2; max_version = tablet->max_version_unlocked(); diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index e743ea9b12c8ce..ad1487917b109a 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -448,7 +448,8 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ int64_t now = duration_cast(system_clock::now().time_since_epoch()).count(); tablet->last_sync_time_s = now; - if (tablet->enable_unique_key_merge_on_write()) { + if (tablet->enable_unique_key_merge_on_write() && + tablet->tablet_state() == TABLET_RUNNING) { DeleteBitmap delete_bitmap(tablet_id); int64_t old_max_version = req.start_version() - 1; auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 213cfa4d927470..67fd9dd0a8df5c 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -110,6 +110,8 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version, // There are only two tablet_states RUNNING and NOT_READY in cloud mode // This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS. Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) { + RETURN_IF_ERROR(sync_if_not_running()); + if (query_version > 0) { std::shared_lock rlock(_meta_lock); if (_max_version >= query_version) { @@ -133,6 +135,57 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) return st; } +// Sync tablet meta and all rowset meta if not running. +// This could happen when BE didn't finish schema change job and another BE committed this schema change job. +// It should be a quite rare situation. +Status CloudTablet::sync_if_not_running() { + if (tablet_state() == TABLET_RUNNING) { + return Status::OK(); + } + + // Serially execute sync to reduce unnecessary network overhead + std::lock_guard lock(_sync_meta_lock); + + { + std::shared_lock rlock(_meta_lock); + if (tablet_state() == TABLET_RUNNING) { + return Status::OK(); + } + } + + TabletMetaSharedPtr tablet_meta; + auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta); + if (!st.ok()) { + if (st.is()) { + clear_cache(); + } + return st; + } + + if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] { + // MoW may go to here when load while schema change + return Status::OK(); + } + + TimestampedVersionTracker empty_tracker; + { + std::lock_guard wlock(_meta_lock); + RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING)); + _rs_version_map.clear(); + _stale_rs_version_map.clear(); + std::swap(_timestamped_version_tracker, empty_tracker); + _tablet_meta->clear_rowsets(); + _tablet_meta->clear_stale_rowset(); + _max_version = -1; + } + + st = _engine.meta_mgr().sync_tablet_rowsets(this); + if (st.is()) { + clear_cache(); + } + return st; +} + TabletSchemaSPtr CloudTablet::merged_tablet_schema() const { std::shared_lock rdlock(_meta_lock); TabletSchemaSPtr target_schema; diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index 43f2a3adafe721..2bd1ce475028ab 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -206,6 +206,8 @@ class CloudTablet final : public BaseTablet { static void recycle_cached_data(const std::vector& rowsets); + Status sync_if_not_running(); + CloudStorageEngine& _engine; // this mutex MUST ONLY be used when sync meta diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp index 80522759b84b44..67897f527032a8 100644 --- a/be/src/cloud/config.cpp +++ b/be/src/cloud/config.cpp @@ -59,4 +59,6 @@ DEFINE_mBool(save_load_error_log_to_s3, "false"); DEFINE_mInt32(sync_load_for_tablets_thread, "32"); +DEFINE_mBool(enable_new_tablet_do_compaction, "true"); + } // namespace doris::config diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h index bf041ba0fa6fc5..4a5b3e0e16a208 100644 --- a/be/src/cloud/config.h +++ b/be/src/cloud/config.h @@ -65,6 +65,7 @@ DECLARE_mInt32(tablet_sync_interval_s); // Cloud compaction config DECLARE_mInt64(min_compaction_failure_interval_ms); +DECLARE_mBool(enable_new_tablet_do_compaction); // For cloud read/write separate mode DECLARE_mInt64(base_compaction_freeze_interval_s); DECLARE_mInt64(cu_compaction_freeze_interval_s); diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 22940b40206de4..d391c26fdbe06e 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1437,7 +1437,7 @@ Status BaseTablet::update_delete_bitmap_without_lock( << ", rnd:" << rnd << ", percent: " << percent; } }); - int64_t cur_version = rowset->end_version(); + int64_t cur_version = rowset->start_version(); std::vector segments; RETURN_IF_ERROR(std::dynamic_pointer_cast(rowset)->load_segments(&segments)); diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 76a12a1b5871ec..599d9c1d1423ca 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -23,7 +23,6 @@ #include #include #include -#include #include #include diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index c7265a6670bc0f..75bb973d2bc662 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -49,9 +49,18 @@ namespace doris::cloud { static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1; static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2; +// check compaction input_versions are valid during schema change. +// If the schema change job doesnt have alter version, it dont need to check +// because the schema change job is come from old version BE. +// we will check they in prepare compaction and commit compaction. +// 1. When if base compaction, we need to guarantee the end version +// is less than or equal to alter_version. +// 2. When if cu compaction, we need to guarantee the start version +// is large than alter_version. bool check_compaction_input_verions(const TabletCompactionJobPB& compaction, const TabletJobInfoPB& job_pb) { - if (!job_pb.schema_change().has_alter_version()) return true; + if (!job_pb.has_schema_change() || !job_pb.schema_change().has_alter_version()) return true; + // compaction need to know [start_version, end_version] DCHECK_EQ(compaction.input_versions_size(), 2) << proto_to_json(compaction); DCHECK_LE(compaction.input_versions(0), compaction.input_versions(1)) << proto_to_json(compaction); @@ -138,7 +147,7 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst } while (err == TxnErrorCode::TXN_OK) { job_pb.ParseFromString(job_val); - if (job_pb.has_schema_change() && !check_compaction_input_verions(compaction, job_pb)) { + if (!check_compaction_input_verions(compaction, job_pb)) { SS << "Check compaction input versions failed in schema change. input_version_start=" << compaction.input_versions(0) << " input_version_end=" << compaction.input_versions(1) @@ -178,8 +187,8 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst // for MOW table, so priority should be given to performing full // compaction operations and canceling other types of compaction. compactions.Clear(); - } else if (!compaction.has_judge_input_versions_range() || - !compaction.judge_input_versions_range()) { + } else if (!compaction.has_check_input_versions_range() || + !compaction.check_input_versions_range()) { // Unknown input version range, doesn't support parallel compaction of same type for (auto& c : compactions) { if (c.type() != compaction.type() && c.type() != TabletCompactionJobPB::FULL) @@ -997,8 +1006,9 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str } // MUST check initiator to let the retried BE commit this schema_change job. - if (schema_change.id() != recorded_schema_change.id() || - schema_change.initiator() != recorded_schema_change.initiator()) { + if (request->action() == FinishTabletJobRequest::COMMIT && + (schema_change.id() != recorded_schema_change.id() || + schema_change.initiator() != recorded_schema_change.initiator())) { SS << "unmatched job id or initiator, recorded_id=" << recorded_schema_change.id() << " given_id=" << schema_change.id() << " recorded_job=" << proto_to_json(recorded_schema_change) @@ -1020,16 +1030,21 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str // Abort //========================================================================== if (request->action() == FinishTabletJobRequest::ABORT) { - // TODO(cyx) - // remove schema change - recorded_job.clear_schema_change(); - auto job_val = recorded_job.SerializeAsString(); - txn->put(job_key, job_val); - txn->remove(new_tablet_job_key); - INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id - << " key=" << hex(job_key); - - need_commit = true; + if (schema_change.new_tablet_idx().index_id() == + recorded_schema_change.new_tablet_idx().index_id() && + schema_change.new_tablet_idx().tablet_id() == + recorded_schema_change.new_tablet_idx().tablet_id()) { + // TODO(cyx) + // remove schema change + recorded_job.clear_schema_change(); + auto job_val = recorded_job.SerializeAsString(); + txn->put(job_key, job_val); + txn->remove(new_tablet_job_key); + INSTANCE_LOG(INFO) << "remove schema_change job tablet_id=" << tablet_id + << " key=" << hex(job_key); + + need_commit = true; + } return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java index c1982daa44d61d..f36d9b5f370006 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java @@ -128,10 +128,15 @@ protected void onCancel() { Long partitionId = partitionEntry.getKey(); Map rollupTabletIdToBaseTabletId = partitionEntry.getValue(); for (Map.Entry tabletEntry : rollupTabletIdToBaseTabletId.entrySet()) { + Long rollupTabletId = tabletEntry.getKey(); Long baseTabletId = tabletEntry.getValue(); ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) - .removeSchemaChangeJob(dbId, tableId, baseIndexId, partitionId, baseTabletId); + .removeSchemaChangeJob(dbId, tableId, baseIndexId, rollupIndexId, + partitionId, baseTabletId, rollupTabletId); } + LOG.info("Cancel RollupJob. Remove SchemaChangeJob in ms." + + "dbId:{}, tableId:{}, rollupIndexId: {} partitionId:{}. tabletSize:{}", + dbId, tableId, rollupIndexId, partitionId, rollupTabletIdToBaseTabletId.size()); } break; } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java index 7023f35d3b03f4..ac80812e5b8420 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java @@ -131,10 +131,15 @@ protected void onCancel() { Long originIndexId = indexIdMap.get(shadowIndexId); Map shadowTabletIdToOriginTabletId = data.getValue(); for (Map.Entry entry : shadowTabletIdToOriginTabletId.entrySet()) { + Long shadowTabletId = entry.getKey(); Long originTabletId = entry.getValue(); ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) - .removeSchemaChangeJob(dbId, tableId, originIndexId, partitionId, originTabletId); + .removeSchemaChangeJob(dbId, tableId, originIndexId, shadowIndexId, + partitionId, originTabletId, shadowTabletId); } + LOG.info("Cancel SchemaChange. Remove SchemaChangeJob in ms." + + "dbId:{}, tableId:{}, originIndexId:{}, partitionId:{}. tabletSize:{}", + dbId, tableId, originIndexId, partitionId, shadowTabletIdToOriginTabletId.size()); } break; } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index 722cd4c15471ce..30785c4fab07eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -810,13 +810,15 @@ private void dropCloudPartition(long dbId, long tableId, List partitionIds } } - public void removeSchemaChangeJob(long dbId, long tableId, long indexId, long partitionId, long tabletId) + public void removeSchemaChangeJob(long dbId, long tableId, long indexId, long newIndexId, + long partitionId, long tabletId, long newTabletId) throws DdlException { Cloud.FinishTabletJobRequest.Builder finishTabletJobRequestBuilder = Cloud.FinishTabletJobRequest.newBuilder(); finishTabletJobRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); finishTabletJobRequestBuilder.setAction(Cloud.FinishTabletJobRequest.Action.ABORT); Cloud.TabletJobInfoPB.Builder tabletJobInfoPBBuilder = Cloud.TabletJobInfoPB.newBuilder(); + // set origin tablet Cloud.TabletIndexPB.Builder tabletIndexPBBuilder = Cloud.TabletIndexPB.newBuilder(); tabletIndexPBBuilder.setDbId(dbId); tabletIndexPBBuilder.setTableId(tableId); @@ -825,6 +827,23 @@ public void removeSchemaChangeJob(long dbId, long tableId, long indexId, long pa tabletIndexPBBuilder.setTabletId(tabletId); final Cloud.TabletIndexPB tabletIndex = tabletIndexPBBuilder.build(); tabletJobInfoPBBuilder.setIdx(tabletIndex); + + // set new tablet + Cloud.TabletSchemaChangeJobPB.Builder schemaChangeJobPBBuilder = + Cloud.TabletSchemaChangeJobPB.newBuilder(); + Cloud.TabletIndexPB.Builder newtabletIndexPBBuilder = Cloud.TabletIndexPB.newBuilder(); + newtabletIndexPBBuilder.setDbId(dbId); + newtabletIndexPBBuilder.setTableId(tableId); + newtabletIndexPBBuilder.setIndexId(newIndexId); + newtabletIndexPBBuilder.setPartitionId(partitionId); + newtabletIndexPBBuilder.setTabletId(newTabletId); + final Cloud.TabletIndexPB newtabletIndex = newtabletIndexPBBuilder.build(); + schemaChangeJobPBBuilder.setNewTabletIdx(newtabletIndex); + final Cloud.TabletSchemaChangeJobPB tabletSchemaChangeJobPb = + schemaChangeJobPBBuilder.build(); + + tabletJobInfoPBBuilder.setSchemaChange(tabletSchemaChangeJobPb); + final Cloud.TabletJobInfoPB tabletJobInfoPB = tabletJobInfoPBBuilder.build(); finishTabletJobRequestBuilder.setJob(tabletJobInfoPB); @@ -839,7 +858,7 @@ public void removeSchemaChangeJob(long dbId, long tableId, long indexId, long pa break; } } catch (RpcException e) { - LOG.warn("tryTimes:{}, dropIndex RpcException", tryTimes, e); + LOG.warn("tryTimes:{}, finishTabletJob RpcException", tryTimes, e); if (tryTimes + 1 >= Config.metaServiceRpcRetryTimes()) { throw new DdlException(e.getMessage()); } @@ -848,7 +867,7 @@ public void removeSchemaChangeJob(long dbId, long tableId, long indexId, long pa } if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { - LOG.warn("dropIndex response: {} ", response); + LOG.warn("finishTabletJob response: {} ", response); } } diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 483d3a3d513530..56549a5124162e 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -536,7 +536,7 @@ message TabletCompactionJobPB { optional int64 lease = 23; // prepare optional int64 delete_bitmap_lock_initiator = 24; optional int64 full_compaction_cnt = 25; // prepare - optional bool judge_input_versions_range = 26; + optional bool check_input_versions_range = 26; } message TabletSchemaChangeJobPB { diff --git a/regression-test/suites/cloud_p0/schema_change/compaction10/test_schema_change_with_compaction10.groovy b/regression-test/suites/cloud_p0/schema_change/compaction10/test_schema_change_with_compaction10.groovy new file mode 100644 index 00000000000000..6fc8003527dc02 --- /dev/null +++ b/regression-test/suites/cloud_p0/schema_change/compaction10/test_schema_change_with_compaction10.groovy @@ -0,0 +1,262 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite('test_schema_change_with_compaction10') { + def options = new ClusterOptions() + options.cloudMode = true + options.enableDebugPoints() + options.beConfigs += [ "enable_java_support=false" ] + options.beConfigs += [ "disable_auto_compaction=true" ] + options.beNum = 1 + docker(options) { + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}", + |"provider" = "${getS3Provider()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + sql new File("""${context.file.parent}/../ddl/date_delete.sql""").text + def load_date_once = { String table -> + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + def loadLabel = table + "_" + uniqueID + // load data from cos + def loadSql = new File("""${context.file.parent}/../ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql + + // check load state + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ("CANCELLED".equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ("FINISHED".equalsIgnoreCase(loadState)) { + break + } + sleep(5000) + } + } + + sql new File("""${context.file.parent}/../ddl/date_unique_create.sql""").text + def injectName = 'CloudSchemaChangeJob.process_alter_tablet.sleep' + def injectBe = null + def backends = sql_return_maparray('show backends') + def array = sql_return_maparray("SHOW TABLETS FROM date") + def injectBeId = array[0].BackendId + def originTabletId = array[0].TabletId + injectBe = backends.stream().filter(be -> be.BackendId == injectBeId).findFirst().orElse(null) + assertNotNull(injectBe) + + def load_delete_compaction = { + load_date_once("date"); + sql "delete from date where d_datekey < 19900000" + sql "select count(*) from date" + // cu compaction + logger.info("run compaction:" + originTabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + boolean running = true + do { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + try { + load_delete_compaction() + load_delete_compaction() + load_delete_compaction() + + load_date_once("date"); + + sleep(1000) + GetDebugPoint().enableDebugPointForAllBEs(injectName) + sql "ALTER TABLE date MODIFY COLUMN d_holidayfl bigint(11)" + sleep(5000) + array = sql_return_maparray("SHOW TABLETS FROM date") + + for (int i = 0; i < 5; i++) { + load_date_once("date"); + } + + cluster.restartBackends() + GetDebugPoint().enableDebugPointForAllBEs(injectName) + sleep(30000) + + // base compaction + logger.info("run compaction:" + originTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + + // wait for all compactions done + boolean running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + def newTabletId = array[1].TabletId + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("invalid tablet state.")) + + + // cu compaction + for (int i = 0; i < array.size(); i++) { + tabletId = array[i].TabletId + logger.info("run compaction:" + tabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + } + + for (int i = 0; i < array.size(); i++) { + running = true + do { + Thread.sleep(100) + tabletId = array[i].TabletId + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + } finally { + if (injectBe != null) { + GetDebugPoint().disableDebugPointForAllBEs(injectName) + } + int max_try_time = 3000 + while (max_try_time--){ + result = getJobState("date") + if (result == "FINISHED" || result == "CANCELLED") { + sleep(3000) + break + } else { + sleep(100) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + assertEquals(result, "FINISHED"); + def count = sql """ select count(*) from date; """ + assertEquals(count[0][0], 2556); + // check rowsets + logger.info("run show:" + originTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-8]")) + assertTrue(out.contains("[9-13]")) + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[7-7]")) + assertTrue(out.contains("[8-8]")) + assertTrue(out.contains("[9-13]")) + + // base compaction + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + + // wait for all compactions done + boolean running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-8]")) + assertTrue(out.contains("[9-13]")) + + for (int i = 0; i < 3; i++) { + load_date_once("date"); + } + + sql """ select count(*) from date """ + + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + // wait for all compactions done + running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-16]")) + } + } +} \ No newline at end of file diff --git a/regression-test/suites/cloud_p0/schema_change/compaction2/test_schema_change_with_compaction3.groovy b/regression-test/suites/cloud_p0/schema_change/compaction2/test_schema_change_with_compaction2.groovy similarity index 100% rename from regression-test/suites/cloud_p0/schema_change/compaction2/test_schema_change_with_compaction3.groovy rename to regression-test/suites/cloud_p0/schema_change/compaction2/test_schema_change_with_compaction2.groovy diff --git a/regression-test/suites/cloud_p0/schema_change/compaction3/test_schema_change_with_compaction5.groovy b/regression-test/suites/cloud_p0/schema_change/compaction3/test_schema_change_with_compaction3.groovy similarity index 100% rename from regression-test/suites/cloud_p0/schema_change/compaction3/test_schema_change_with_compaction5.groovy rename to regression-test/suites/cloud_p0/schema_change/compaction3/test_schema_change_with_compaction3.groovy diff --git a/regression-test/suites/cloud_p0/schema_change/compaction4/test_schema_change_with_compaction6.groovy b/regression-test/suites/cloud_p0/schema_change/compaction4/test_schema_change_with_compaction4.groovy similarity index 100% rename from regression-test/suites/cloud_p0/schema_change/compaction4/test_schema_change_with_compaction6.groovy rename to regression-test/suites/cloud_p0/schema_change/compaction4/test_schema_change_with_compaction4.groovy diff --git a/regression-test/suites/cloud_p0/schema_change/compaction5/test_schema_change_with_compaction7.groovy b/regression-test/suites/cloud_p0/schema_change/compaction5/test_schema_change_with_compaction5.groovy similarity index 100% rename from regression-test/suites/cloud_p0/schema_change/compaction5/test_schema_change_with_compaction7.groovy rename to regression-test/suites/cloud_p0/schema_change/compaction5/test_schema_change_with_compaction5.groovy diff --git a/regression-test/suites/cloud_p0/schema_change/compaction6/test_schema_change_with_compaction9.groovy b/regression-test/suites/cloud_p0/schema_change/compaction6/test_schema_change_with_compaction6.groovy similarity index 100% rename from regression-test/suites/cloud_p0/schema_change/compaction6/test_schema_change_with_compaction9.groovy rename to regression-test/suites/cloud_p0/schema_change/compaction6/test_schema_change_with_compaction6.groovy diff --git a/regression-test/suites/cloud_p0/schema_change/compaction7/test_schema_change_with_compaction7.groovy b/regression-test/suites/cloud_p0/schema_change/compaction7/test_schema_change_with_compaction7.groovy new file mode 100644 index 00000000000000..7291ea3a341e44 --- /dev/null +++ b/regression-test/suites/cloud_p0/schema_change/compaction7/test_schema_change_with_compaction7.groovy @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Most of the cases are copied from https://github.com/trinodb/trino/tree/master +// /testing/trino-product-tests/src/main/resources/sql-tests/testcases +// and modified by Doris. + +// Note: To filter out tables from sql files, use the following one-liner comamnd +// sed -nr 's/.*tables: (.*)$/\1/gp' /path/to/*.sql | sed -nr 's/,/\n/gp' | sort | uniq + +import org.apache.doris.regression.util.DebugPoint + +import org.apache.doris.regression.util.NodeType + +suite('test_schema_change_with_compaction7', 'nonConcurrent') { + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}", + |"provider" = "${getS3Provider()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + sql new File("""${context.file.parent}/../ddl/date_delete.sql""").text + def load_date_once = { String table -> + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + def loadLabel = table + "_" + uniqueID + // load data from cos + def loadSql = new File("""${context.file.parent}/../ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql + + // check load state + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ("CANCELLED".equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ("FINISHED".equalsIgnoreCase(loadState)) { + break + } + sleep(5000) + } + } + + sql new File("""${context.file.parent}/../ddl/date_unique_create.sql""").text + def injectName = 'CloudSchemaChangeJob.process_alter_tablet.sleep' + def injectBe = null + def backends = sql_return_maparray('show backends') + def array = sql_return_maparray("SHOW TABLETS FROM date") + def injectBeId = array[0].BackendId + def originTabletId = array[0].TabletId + injectBe = backends.stream().filter(be -> be.BackendId == injectBeId).findFirst().orElse(null) + assertNotNull(injectBe) + + def load_delete_compaction = { + load_date_once("date"); + sql "delete from date where d_datekey < 19900000" + sql "select count(*) from date" + // cu compaction + logger.info("run compaction:" + originTabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + boolean running = true + do { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + try { + load_delete_compaction() + load_delete_compaction() + load_delete_compaction() + + load_date_once("date"); + + sleep(1000) + + DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, injectName) + sql "ALTER TABLE date MODIFY COLUMN d_holidayfl bigint(11)" + sleep(15000) + array = sql_return_maparray("SHOW TABLETS FROM date") + + for (int i = 0; i < 5; i++) { + load_date_once("date"); + } + // base compaction + logger.info("run compaction:" + originTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + + // wait for all compactions done + boolean running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + def newTabletId = array[1].TabletId + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("invalid tablet state.")) + + + // cu compaction + for (int i = 0; i < array.size(); i++) { + tabletId = array[i].TabletId + logger.info("run compaction:" + tabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + } + + for (int i = 0; i < array.size(); i++) { + running = true + do { + Thread.sleep(100) + tabletId = array[i].TabletId + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + } finally { + if (injectBe != null) { + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, injectName) + } + int max_try_time = 3000 + while (max_try_time--){ + result = getJobState("date") + if (result == "FINISHED" || result == "CANCELLED") { + sleep(3000) + break + } else { + sleep(100) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + assertEquals(result, "FINISHED"); + def count = sql """ select count(*) from date; """ + assertEquals(count[0][0], 2556); + // check rowsets + logger.info("run show:" + originTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-8]")) + assertTrue(out.contains("[9-13]")) + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[7-7]")) + assertTrue(out.contains("[8-8]")) + assertTrue(out.contains("[9-13]")) + + // base compaction + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + + // wait for all compactions done + boolean running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-8]")) + assertTrue(out.contains("[9-13]")) + + for (int i = 0; i < 3; i++) { + load_date_once("date"); + } + + sql """ select count(*) from date """ + + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + // wait for all compactions done + running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-16]")) + } + +} \ No newline at end of file diff --git a/regression-test/suites/cloud_p0/schema_change/compaction8/test_schema_change_with_compaction8.groovy b/regression-test/suites/cloud_p0/schema_change/compaction8/test_schema_change_with_compaction8.groovy new file mode 100644 index 00000000000000..1017e1d50f235f --- /dev/null +++ b/regression-test/suites/cloud_p0/schema_change/compaction8/test_schema_change_with_compaction8.groovy @@ -0,0 +1,214 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Most of the cases are copied from https://github.com/trinodb/trino/tree/master +// /testing/trino-product-tests/src/main/resources/sql-tests/testcases +// and modified by Doris. + +// Note: To filter out tables from sql files, use the following one-liner comamnd +// sed -nr 's/.*tables: (.*)$/\1/gp' /path/to/*.sql | sed -nr 's/,/\n/gp' | sort | uniq + +import org.apache.doris.regression.util.DebugPoint + +import org.apache.doris.regression.util.NodeType + +suite('test_schema_change_with_compaction8', 'nonConcurrent') { + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}", + |"provider" = "${getS3Provider()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + sql new File("""${context.file.parent}/../ddl/date_delete.sql""").text + def load_date_once = { String table -> + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + def loadLabel = table + "_" + uniqueID + // load data from cos + def loadSql = new File("""${context.file.parent}/../ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql + + // check load state + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ("CANCELLED".equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ("FINISHED".equalsIgnoreCase(loadState)) { + break + } + sleep(5000) + } + } + + sql new File("""${context.file.parent}/../ddl/date_unique_create.sql""").text + def injectName = 'CloudSchemaChangeJob.process_alter_tablet.sleep' + def injectBe = null + def backends = sql_return_maparray('show backends') + def array = sql_return_maparray("SHOW TABLETS FROM date") + def injectBeId = array[0].BackendId + def originTabletId = array[0].TabletId + injectBe = backends.stream().filter(be -> be.BackendId == injectBeId).findFirst().orElse(null) + assertNotNull(injectBe) + + def load_delete_compaction = { + load_date_once("date"); + sql "delete from date where d_datekey < 19900000" + sql "select count(*) from date" + // cu compaction + logger.info("run compaction:" + originTabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + boolean running = true + do { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + try { + load_delete_compaction() + load_delete_compaction() + load_delete_compaction() + + + sleep(1000) + + DebugPoint.enableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, injectName) + sql "ALTER TABLE date MODIFY COLUMN d_holidayfl bigint(11)" + sleep(5000) + array = sql_return_maparray("SHOW TABLETS FROM date") + + + // base compaction + logger.info("run compaction:" + originTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + + // wait for all compactions done + boolean running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + def newTabletId = array[1].TabletId + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("invalid tablet state.")) + + } finally { + if (injectBe != null) { + DebugPoint.disableDebugPoint(injectBe.Host, injectBe.HttpPort.toInteger(), NodeType.BE, injectName) + } + int max_try_time = 3000 + while (max_try_time--){ + result = getJobState("date") + if (result == "FINISHED") { + sleep(3000) + break + } else { + sleep(100) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + for (int i = 0; i < 5; i++) { + load_date_once("date"); + } + def count = sql """ select count(*) from date; """ + assertEquals(count[0][0], 2556); + // check rowsets + logger.info("run show:" + originTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[7-7]")) + + // base compaction + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + + // wait for all compactions done + boolean running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-12]")) + } + +} \ No newline at end of file diff --git a/regression-test/suites/cloud_p0/schema_change/compaction9/test_schema_change_with_compaction9.groovy b/regression-test/suites/cloud_p0/schema_change/compaction9/test_schema_change_with_compaction9.groovy new file mode 100644 index 00000000000000..6cb47e01f4b62c --- /dev/null +++ b/regression-test/suites/cloud_p0/schema_change/compaction9/test_schema_change_with_compaction9.groovy @@ -0,0 +1,259 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType + +suite('test_schema_change_with_compaction9') { + def options = new ClusterOptions() + options.cloudMode = true + options.enableDebugPoints() + options.beConfigs += [ "enable_java_support=false" ] + options.beConfigs += [ "disable_auto_compaction=true" ] + options.beNum = 1 + docker(options) { + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + def s3BucketName = getS3BucketName() + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}", + |"provider" = "${getS3Provider()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + sql new File("""${context.file.parent}/../ddl/date_delete.sql""").text + def load_date_once = { String table -> + def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + def loadLabel = table + "_" + uniqueID + // load data from cos + def loadSql = new File("""${context.file.parent}/../ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties + sql loadSql + + // check load state + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ("CANCELLED".equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ("FINISHED".equalsIgnoreCase(loadState)) { + break + } + sleep(5000) + } + } + + sql new File("""${context.file.parent}/../ddl/date_unique_create.sql""").text + def injectName = 'CloudSchemaChangeJob.process_alter_tablet.sleep' + def injectBe = null + def backends = sql_return_maparray('show backends') + def array = sql_return_maparray("SHOW TABLETS FROM date") + def injectBeId = array[0].BackendId + def originTabletId = array[0].TabletId + injectBe = backends.stream().filter(be -> be.BackendId == injectBeId).findFirst().orElse(null) + assertNotNull(injectBe) + + def load_delete_compaction = { + load_date_once("date"); + sql "delete from date where d_datekey < 19900000" + sql "select count(*) from date" + // cu compaction + logger.info("run compaction:" + originTabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + boolean running = true + do { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + try { + load_delete_compaction() + load_delete_compaction() + load_delete_compaction() + + load_date_once("date"); + + sleep(1000) + GetDebugPoint().enableDebugPointForAllBEs(injectName) + sql "ALTER TABLE date MODIFY COLUMN d_holidayfl bigint(11)" + sleep(5000) + array = sql_return_maparray("SHOW TABLETS FROM date") + + for (int i = 0; i < 5; i++) { + load_date_once("date"); + } + // base compaction + logger.info("run compaction:" + originTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + + // wait for all compactions done + boolean running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + def newTabletId = array[1].TabletId + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("invalid tablet state.")) + + + // cu compaction + for (int i = 0; i < array.size(); i++) { + tabletId = array[i].TabletId + logger.info("run compaction:" + tabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + } + + for (int i = 0; i < array.size(); i++) { + running = true + do { + Thread.sleep(100) + tabletId = array[i].TabletId + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + cluster.restartFrontends() + sleep(30000) + context.reconnectFe() + } finally { + if (injectBe != null) { + GetDebugPoint().disableDebugPointForAllBEs(injectName) + } + int max_try_time = 3000 + while (max_try_time--){ + result = getJobState("date") + if (result == "FINISHED" || result == "CANCELLED") { + sleep(3000) + break + } else { + sleep(100) + if (max_try_time < 1){ + assertEquals(1,2) + } + } + } + assertEquals(result, "FINISHED"); + def count = sql """ select count(*) from date; """ + assertEquals(count[0][0], 2556); + // check rowsets + logger.info("run show:" + originTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, originTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-8]")) + assertTrue(out.contains("[9-13]")) + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-2]")) + assertTrue(out.contains("[7-7]")) + assertTrue(out.contains("[8-8]")) + assertTrue(out.contains("[9-13]")) + + // base compaction + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_base_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + + // wait for all compactions done + boolean running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-8]")) + assertTrue(out.contains("[9-13]")) + + for (int i = 0; i < 3; i++) { + load_date_once("date"); + } + + sql """ select count(*) from date """ + + logger.info("run compaction:" + newTabletId) + (code, out, err) = be_run_cumulative_compaction(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + // wait for all compactions done + running = true + while (running) { + Thread.sleep(100) + (code, out, err) = be_get_compaction_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + + logger.info("run show:" + newTabletId) + (code, out, err) = be_show_tablet_status(injectBe.Host, injectBe.HttpPort, newTabletId) + logger.info("Run show: code=" + code + ", out=" + out + ", err=" + err) + assertTrue(out.contains("[0-1]")) + assertTrue(out.contains("[2-7]")) + assertTrue(out.contains("[8-16]")) + } + } +} \ No newline at end of file diff --git a/regression-test/suites/cloud_p0/schema_change/ddl/date_create.sql b/regression-test/suites/cloud_p0/schema_change/ddl/date_create.sql new file mode 100644 index 00000000000000..8486d7178bbe5d --- /dev/null +++ b/regression-test/suites/cloud_p0/schema_change/ddl/date_create.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS `date` ( + `d_datekey` int(11) NOT NULL COMMENT "", + `d_date` varchar(20) NOT NULL COMMENT "", + `d_dayofweek` varchar(10) NOT NULL COMMENT "", + `d_month` varchar(11) NOT NULL COMMENT "", + `d_year` int(11) NOT NULL COMMENT "", + `d_yearmonthnum` int(11) NOT NULL COMMENT "", + `d_yearmonth` varchar(9) NOT NULL COMMENT "", + `d_daynuminweek` int(11) NOT NULL COMMENT "", + `d_daynuminmonth` int(11) NOT NULL COMMENT "", + `d_daynuminyear` int(11) NOT NULL COMMENT "", + `d_monthnuminyear` int(11) NOT NULL COMMENT "", + `d_weeknuminyear` int(11) NOT NULL COMMENT "", + `d_sellingseason` varchar(14) NOT NULL COMMENT "", + `d_lastdayinweekfl` int(11) NOT NULL COMMENT "", + `d_lastdayinmonthfl` int(11) NOT NULL COMMENT "", + `d_holidayfl` int(11) NOT NULL COMMENT "", + `d_weekdayfl` int(11) NOT NULL COMMENT "" +) +DISTRIBUTED BY HASH(`d_datekey`) BUCKETS 1 +PROPERTIES ( +"replication_num" = "1" +); \ No newline at end of file diff --git a/regression-test/suites/cloud_p0/schema_change/ddl/date_delete.sql b/regression-test/suites/cloud_p0/schema_change/ddl/date_delete.sql new file mode 100644 index 00000000000000..41702d336d7e7f --- /dev/null +++ b/regression-test/suites/cloud_p0/schema_change/ddl/date_delete.sql @@ -0,0 +1 @@ +drop table if exists `date` FORCE; \ No newline at end of file diff --git a/regression-test/suites/cloud_p0/schema_change/ddl/date_load.sql b/regression-test/suites/cloud_p0/schema_change/ddl/date_load.sql new file mode 100644 index 00000000000000..3e1511ca69a67a --- /dev/null +++ b/regression-test/suites/cloud_p0/schema_change/ddl/date_load.sql @@ -0,0 +1,6 @@ +LOAD LABEL ${loadLabel} ( + DATA INFILE("s3://${s3BucketName}/regression/ssb/sf100/date.tbl.gz") + INTO TABLE date + COLUMNS TERMINATED BY "|" + (d_datekey,d_date,d_dayofweek,d_month,d_year,d_yearmonthnum,d_yearmonth, d_daynuminweek,d_daynuminmonth,d_daynuminyear,d_monthnuminyear,d_weeknuminyear, d_sellingseason,d_lastdayinweekfl,d_lastdayinmonthfl,d_holidayfl,d_weekdayfl,temp) +) diff --git a/regression-test/suites/cloud_p0/schema_change/ddl/date_unique_create.sql b/regression-test/suites/cloud_p0/schema_change/ddl/date_unique_create.sql new file mode 100644 index 00000000000000..0c3005c6e03f77 --- /dev/null +++ b/regression-test/suites/cloud_p0/schema_change/ddl/date_unique_create.sql @@ -0,0 +1,26 @@ +CREATE TABLE IF NOT EXISTS `date` ( + `d_datekey` int(11) NOT NULL COMMENT "", + `d_date` varchar(20) NOT NULL COMMENT "", + `d_dayofweek` varchar(10) NOT NULL COMMENT "", + `d_month` varchar(11) NOT NULL COMMENT "", + `d_year` int(11) NOT NULL COMMENT "", + `d_yearmonthnum` int(11) NOT NULL COMMENT "", + `d_yearmonth` varchar(9) NOT NULL COMMENT "", + `d_daynuminweek` int(11) NOT NULL COMMENT "", + `d_daynuminmonth` int(11) NOT NULL COMMENT "", + `d_daynuminyear` int(11) NOT NULL COMMENT "", + `d_monthnuminyear` int(11) NOT NULL COMMENT "", + `d_weeknuminyear` int(11) NOT NULL COMMENT "", + `d_sellingseason` varchar(14) NOT NULL COMMENT "", + `d_lastdayinweekfl` int(11) NOT NULL COMMENT "", + `d_lastdayinmonthfl` int(11) NOT NULL COMMENT "", + `d_holidayfl` int(11) NOT NULL COMMENT "", + `d_weekdayfl` int(11) NOT NULL COMMENT "" +) +UNIQUE KEY (`d_datekey`) +DISTRIBUTED BY HASH(`d_datekey`) BUCKETS 1 +PROPERTIES ( +"replication_num" = "1", +"enable_unique_key_merge_on_write" = "true", +"enable_mow_light_delete" = "true" +); \ No newline at end of file