Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug](insert )fix insert wrong data on mv when stmt have multiple values #27297

Merged
merged 3 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -641,25 +641,6 @@ private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserEx
checkColumnCoverage(mentionedColumns, targetTable.getBaseSchema());

realTargetColumnNames = targetColumns.stream().map(Column::getName).collect(Collectors.toList());
Map<String, Expr> slotToIndex = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (int i = 0; i < queryStmt.getResultExprs().size(); i++) {
Expr expr = queryStmt.getResultExprs().get(i);
if (!(expr instanceof StringLiteral && ((StringLiteral) expr).getValue()
.equals(SelectStmt.DEFAULT_VALUE))) {
slotToIndex.put(realTargetColumnNames.get(i), queryStmt.getResultExprs().get(i)
.checkTypeCompatibility(targetTable.getColumn(realTargetColumnNames.get(i)).getType()));
}
}

for (Column column : targetTable.getBaseSchema()) {
if (!slotToIndex.containsKey(column.getName())) {
if (column.getDefaultValue() == null) {
slotToIndex.put(column.getName(), new NullLiteral());
} else {
slotToIndex.put(column.getName(), new StringLiteral(column.getDefaultValue()));
}
}
}

// handle VALUES() or SELECT constant list
if (isValuesOrConstantSelect) {
Expand All @@ -668,7 +649,8 @@ private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserEx
// INSERT INTO VALUES(...)
List<ArrayList<Expr>> rows = selectStmt.getValueList().getRows();
for (int rowIdx = 0; rowIdx < rows.size(); ++rowIdx) {
analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols, slotToIndex, skipCheck);
analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols, realTargetColumnNames,
skipCheck);
}

// clear these 2 structures, rebuild them using VALUES exprs
Expand All @@ -686,7 +668,8 @@ private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserEx
// `selectStmt.getResultExprs().clear();` will clear the `rows` too, causing
// error.
rows.add(Lists.newArrayList(selectStmt.getResultExprs()));
analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols, slotToIndex, skipCheck);
analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols, realTargetColumnNames,
skipCheck);
// rows may be changed in analyzeRow(), so rebuild the result exprs
selectStmt.getResultExprs().clear();
for (Expr expr : rows.get(0)) {
Expand All @@ -697,6 +680,8 @@ private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserEx
// INSERT INTO SELECT ... FROM tbl
if (!origColIdxsForExtendCols.isEmpty()) {
// extend the result expr by duplicating the related exprs
Map<String, Expr> slotToIndex = buildSlotToIndex(queryStmt.getResultExprs(), realTargetColumnNames,
analyzer);
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
if (entry.second == null) {
queryStmt.getResultExprs().add(queryStmt.getResultExprs().get(entry.first));
Expand Down Expand Up @@ -726,6 +711,8 @@ private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserEx
// expand colLabels in QueryStmt
if (!origColIdxsForExtendCols.isEmpty()) {
if (queryStmt.getResultExprs().size() != queryStmt.getBaseTblResultExprs().size()) {
Map<String, Expr> slotToIndex = buildSlotToIndex(queryStmt.getBaseTblResultExprs(),
realTargetColumnNames, analyzer);
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
if (entry.second == null) {
queryStmt.getBaseTblResultExprs().add(queryStmt.getBaseTblResultExprs().get(entry.first));
Expand Down Expand Up @@ -764,9 +751,33 @@ private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserEx
}
}

private void analyzeRow(Analyzer analyzer, List<Column> targetColumns, List<ArrayList<Expr>> rows,
int rowIdx, List<Pair<Integer, Column>> origColIdxsForExtendCols, Map<String, Expr> slotToIndex,
boolean skipCheck) throws AnalysisException {
private Map<String, Expr> buildSlotToIndex(ArrayList<Expr> row, List<String> realTargetColumnNames,
Analyzer analyzer) throws AnalysisException {
Map<String, Expr> slotToIndex = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (int i = 0; i < row.size(); i++) {
Expr expr = row.get(i);
expr.analyze(analyzer);
if (!(expr instanceof StringLiteral
&& ((StringLiteral) expr).getValue().equals(SelectStmt.DEFAULT_VALUE))) {
slotToIndex.put(realTargetColumnNames.get(i),
expr.checkTypeCompatibility(targetTable.getColumn(realTargetColumnNames.get(i)).getType()));
}
}
for (Column column : targetTable.getBaseSchema()) {
if (!slotToIndex.containsKey(column.getName())) {
if (column.getDefaultValue() == null) {
slotToIndex.put(column.getName(), new NullLiteral());
} else {
slotToIndex.put(column.getName(), new StringLiteral(column.getDefaultValue()));
}
}
}
return slotToIndex;
}

private void analyzeRow(Analyzer analyzer, List<Column> targetColumns, List<ArrayList<Expr>> rows, int rowIdx,
List<Pair<Integer, Column>> origColIdxsForExtendCols, List<String> realTargetColumnNames, boolean skipCheck)
throws AnalysisException {
// 1. check number of fields if equal with first row
// targetColumns contains some shadow columns, which is added by system,
// so we should minus this
Expand All @@ -778,6 +789,8 @@ private void analyzeRow(Analyzer analyzer, List<Column> targetColumns, List<Arra
}

ArrayList<Expr> row = rows.get(rowIdx);
Map<String, Expr> slotToIndex = buildSlotToIndex(row, realTargetColumnNames, analyzer);

if (!origColIdxsForExtendCols.isEmpty()) {
/**
* we should extend the row for shadow columns.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_star --
1 1 1 2020-02-02 1
1 2 2 2020-02-02 1

-- !select_mv --
1 1
2 1

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite ("test_insert_multi") {

sql """ DROP TABLE IF EXISTS sales_records; """

sql """
create table sales_records(record_id int, seller_id int, store_id int, sale_date date, sale_amt bigint) distributed by hash(record_id) properties("replication_num" = "1");
"""

createMV ("create materialized view store_amt as select store_id, sum(sale_amt) from sales_records group by store_id;")

sql """insert into sales_records values(1,1,1,"2020-02-02",1),(1,2,2,"2020-02-02",1);"""

qt_select_star "select * from sales_records order by 1,2;"

explain {
sql(" SELECT store_id, sum(sale_amt) FROM sales_records GROUP BY store_id order by 1;")
contains "(store_amt)"
}
qt_select_mv " SELECT store_id, sum(sale_amt) FROM sales_records GROUP BY store_id order by 1;"
}