From c3f80f191bf580dd54502122990ab617d877cecc Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 10 Nov 2023 09:58:39 +0800 Subject: [PATCH] [Fix](partial update) Fix core when doing partial update on tables with row column after schema change #26632 (#26695) --- .../olap/rowset/segment_v2/segment_writer.cpp | 4 +- be/src/olap/tablet.cpp | 14 +- be/src/olap/tablet.h | 6 +- ...partial_update_schema_change_row_store.out | 76 + .../test_partial_update_schema_change.groovy | 7 +- ...tial_update_schema_change_row_store.groovy | 1223 +++++++++++++++++ 6 files changed, 1319 insertions(+), 11 deletions(-) create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.out create mode 100644 regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index add9e7f8697cc5..8e7bd5a90eb691 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -578,8 +578,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f read_index[id_and_pos.pos] = read_idx++; } if (has_row_column) { - auto st = _tablet->fetch_value_through_row_column(rowset, seg_it.first, rids, - cids_missing, old_value_block); + auto st = _tablet->fetch_value_through_row_column( + rowset, *_tablet_schema, seg_it.first, rids, cids_missing, old_value_block); if (!st.ok()) { LOG(WARNING) << "failed to fetch value through row column"; return st; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 17f522a3b841ad..7c9a6ecfaa6fd5 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2686,7 +2686,8 @@ void Tablet::update_max_version_schema(const TabletSchemaSPtr& tablet_schema) { } // fetch value by row column -Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint32_t segid, +Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, + const TabletSchema& tablet_schema, uint32_t segid, const std::vector& rowids, const std::vector& cids, vectorized::Block& block) { @@ -2694,7 +2695,6 @@ Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint BetaRowsetSharedPtr rowset = std::static_pointer_cast(input_rowset); CHECK(rowset); - const TabletSchemaSPtr tablet_schema = rowset->tablet_schema(); SegmentCacheHandle segment_cache; RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, &segment_cache, true)); // find segment @@ -2713,10 +2713,10 @@ Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint LOG_EVERY_N(INFO, 500) << "fetch_value_by_rowids, cost(us):" << watch.elapsed_time() / 1000 << ", row_batch_size:" << rowids.size(); }); - CHECK(tablet_schema->store_row_column()); + CHECK(tablet_schema.store_row_column()); // create _source column std::unique_ptr column_iterator; - RETURN_IF_ERROR(segment->new_column_iterator(tablet_schema->column(BeConsts::ROW_STORE_COL), + RETURN_IF_ERROR(segment->new_column_iterator(tablet_schema.column(BeConsts::ROW_STORE_COL), &column_iterator)); segment_v2::ColumnIteratorOptions opt; OlapReaderStatistics stats; @@ -2735,7 +2735,7 @@ Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint std::vector default_values; default_values.resize(cids.size()); for (int i = 0; i < cids.size(); ++i) { - const TabletColumn& column = tablet_schema->column(cids[i]); + const TabletColumn& column = tablet_schema.column(cids[i]); vectorized::DataTypePtr type = vectorized::DataTypeFactory::instance().create_data_type(column); col_uid_to_idx[column.unique_id()] = i; @@ -3253,8 +3253,8 @@ Status Tablet::read_columns_by_plan(TabletSchemaSPtr tablet_schema, (*read_index)[id_and_pos.pos] = read_idx++; } if (has_row_column) { - auto st = fetch_value_through_row_column(rowset_iter->second, seg_it.first, rids, - cids_to_read, block); + auto st = fetch_value_through_row_column(rowset_iter->second, *tablet_schema, + seg_it.first, rids, cids_to_read, block); if (!st.ok()) { LOG(WARNING) << "failed to fetch value through row column"; return st; diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 7fbf2b9a98f3a7..f7176bd4bcca45 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -442,7 +442,11 @@ class Tablet : public BaseTablet { const TabletColumn& tablet_column, vectorized::MutableColumnPtr& dst); - Status fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint32_t segid, + // We use the TabletSchema from the caller because the TabletSchema in the rowset'meta + // may be outdated due to schema change. Also note that the the cids should indicate the indexes + // of the columns in the TabletSchema passed in. + Status fetch_value_through_row_column(RowsetSharedPtr input_rowset, + const TabletSchema& tablet_schema, uint32_t segid, const std::vector& rowids, const std::vector& cids, vectorized::Block& block); diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.out new file mode 100644 index 00000000000000..86df3374712e9f --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.out @@ -0,0 +1,76 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql1 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql2 -- +1 1 1 0 0 0 0 0 0 0 0 + +-- !sql3 -- +1 1 1 0 0 0 0 0 0 0 10 + +-- !sql4 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql5 -- +1 1 1 0 0 0 0 0 0 + +-- !sql6 -- +1 2 1 0 0 0 0 1 0 + +-- !sql7 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql8 -- +1 1 1.0 0 0 0 0 0 0 0 + +-- !sql9 -- +1 + +-- !sql10 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql11 -- +1 1 1 0 0 0 0 0 0 0 + +-- !sql12 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql13 -- +1 1 1 0 0 0 0 0 0 0 + +-- !sql14 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql15 -- +1 1 1 0 0 0 0 0 0 0 0 + +-- !sql16 -- +1 1 1 0 0 0 0 0 0 0 10 + +-- !sql17 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql18 -- +1 1 1 0 0 0 0 0 0 + +-- !sql19 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql20 -- +1 1 1.0 0 0 0 0 0 0 0 + +-- !sql21 -- +1 + +-- !sql23 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql24 -- +1 1 1 0 0 0 0 0 0 0 + +-- !sql25 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql26 -- +1 1 1 0 0 0 0 0 0 0 + diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy index 332662af4f4aa7..4bce896ed48ac1 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy @@ -17,7 +17,10 @@ // under the License. suite("test_partial_update_schema_change", "p0") { - // test add value column + + /* ============================================== light schema change cases: ============================================== */ + + // test add value column def tableName = "test_partial_update_light_schema_change_add_column" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ @@ -618,6 +621,8 @@ suite("test_partial_update_schema_change", "p0") { sql """ DROP TABLE IF EXISTS ${tableName} """ + /* ============================================== schema change cases: ============================================== */ + // test add value column tableName = "test_partial_update_schema_change_add_column" sql """ DROP TABLE IF EXISTS ${tableName} """ diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy new file mode 100644 index 00000000000000..1fa4ec39d36d04 --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change_row_store.groovy @@ -0,0 +1,1223 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partial_update_row_store_schema_change", "p0") { + + /* ============================================== light schema change cases: ============================================== */ + + // test add value column + def tableName = "test_partial_update_row_store_light_schema_change_add_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql1 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} add column c10 INT DEFAULT '0' " + def try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data without new column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_without_new_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + // check data, new column is filled by default value. + qt_sql2 " select * from ${tableName} order by c0 " + + // test load data with new column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2, c10' + + file 'schema_change/load_with_new_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + // check data, new column is filled by given value. + qt_sql3 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test delete value column + tableName = "test_partial_update_row_store_light_schema_change_delete_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql4 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} DROP COLUMN c8 " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data without delete column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_delete_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql5 " select * from ${tableName} order by c0 " + + // test load data with delete column, stream load will ignore the + // non-existing column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c7, c8' + + file 'schema_change/load_without_delete_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + // check result, which is fail for loading delete column. + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql6 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test update value column + tableName = "test_partial_update_row_store_light_schema_change_update_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql7 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} MODIFY COLUMN c2 double " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data with update column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_update_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql8 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test add key column + tableName = "test_partial_update_row_store_light_schema_change_add_key_column" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0' + + file 'schema_change/load1.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql9 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} ADD COLUMN c1 int key null " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + sql " ALTER table ${tableName} ADD COLUMN c2 int null " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data with all key column, should fail because + // it don't have any value columns + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1' + + file 'schema_change/load_with_key_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertEquals(0, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test create index + tableName = "test_partial_update_row_store_light_schema_change_create_index" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql10 " select * from ${tableName} order by c0 " + + sql " CREATE INDEX test ON ${tableName} (c1) USING BITMAP " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + //test load data with create index + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_create_index.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql11 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + // test change properties + tableName = "test_partial_update_row_store_light_schema_change_properties" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql12 " select * from ${tableName} order by c0 " + + sql " ALTER TABLE ${tableName} set ('in_memory' = 'false') " + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_change_properties.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql13 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + /* ============================================== schema change cases: ============================================== */ + + // test add value column + tableName = "test_partial_update_row_store_schema_change_add_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql14 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} add column c10 INT DEFAULT '0' " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data without new column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_without_new_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + // check data, new column is filled by default value. + qt_sql15 " select * from ${tableName} order by c0 " + + // test load data with new column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2, c10' + + file 'schema_change/load_with_new_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + // check data, new column is filled by given value. + qt_sql16 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test delete value column + tableName = "test_partial_update_row_store_schema_change_delete_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql17 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} DROP COLUMN c8 " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data without delete column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_delete_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql18 " select * from ${tableName} order by c0 " + + // test load data with delete column + // todo bug + // streamLoad { + // table "${tableName}" + + // set 'column_separator', ',' + // set 'partial_columns', 'true' + // set 'columns', 'c0, c1, c8' + + // file 'schema_change/load_without_delete_column.csv' + // time 10000 // limit inflight 10s + + // check { result, exception, startTime, endTime -> + // if (exception != null) { + // throw exception + // } + // // check result, which is fail for loading delete column. + // log.info("Stream load result: ${result}".toString()) + // def json = parseJson(result) + // assertEquals("fail", json.Status.toLowerCase()) + // assertEquals(1, json.NumberTotalRows) + // assertEquals(1, json.NumberFilteredRows) + // assertEquals(0, json.NumberUnselectedRows) + // } + // } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test update value column + tableName = "test_partial_update_row_store_schema_change_update_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql19 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} MODIFY COLUMN c2 double " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data with update column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_update_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql20 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test add key column + tableName = "test_partial_update_row_store_schema_change_add_key_column" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0' + + file 'schema_change/load1.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql21 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} ADD COLUMN c1 int key null " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + sql " ALTER table ${tableName} ADD COLUMN c2 int null " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + // test load data with all key column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1' + + file 'schema_change/load_with_key_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertEquals(0, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test create index + tableName = "test_partial_update_row_store_schema_change_create_index" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql23 " select * from ${tableName} order by c0 " + + sql " CREATE INDEX test ON ${tableName} (c1) USING BITMAP " + try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(1200) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + + //test load data with create index + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_create_index.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql24 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + // test change properties + tableName = "test_partial_update_row_store_schema_change_properties" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "store_row_column" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql25 " select * from ${tableName} order by c0 " + + sql " ALTER TABLE ${tableName} set ('in_memory' = 'false') " + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_change_properties.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + + sql "sync" + + qt_sql26 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ +}