Skip to content

Commit

Permalink
[Bug](insert)fix insert wrong data on mv when stmt have multiple valu…
Browse files Browse the repository at this point in the history
…es (#27297) (#27382)

fix insert wrong data on mv when stmt have multiple values
  • Loading branch information
BiteTheDDDDt authored Nov 22, 2023
1 parent 1a94ccb commit 17a92ef
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -595,25 +595,6 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
checkColumnCoverage(mentionedColumns, targetTable.getBaseSchema());

realTargetColumnNames = targetColumns.stream().map(Column::getName).collect(Collectors.toList());
Map<String, Expr> slotToIndex = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (int i = 0; i < queryStmt.getResultExprs().size(); i++) {
Expr expr = queryStmt.getResultExprs().get(i);
if (!(expr instanceof StringLiteral && ((StringLiteral) expr).getValue()
.equals(SelectStmt.DEFAULT_VALUE))) {
slotToIndex.put(realTargetColumnNames.get(i), queryStmt.getResultExprs().get(i)
.checkTypeCompatibility(targetTable.getColumn(realTargetColumnNames.get(i)).getType()));
}
}

for (Column column : targetTable.getBaseSchema()) {
if (!slotToIndex.containsKey(column.getName())) {
if (column.getDefaultValue() == null) {
slotToIndex.put(column.getName(), new NullLiteral());
} else {
slotToIndex.put(column.getName(), new StringLiteral(column.getDefaultValue()));
}
}
}

// handle VALUES() or SELECT constant list
if (isValuesOrConstantSelect) {
Expand All @@ -622,7 +603,7 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
// INSERT INTO VALUES(...)
List<ArrayList<Expr>> rows = selectStmt.getValueList().getRows();
for (int rowIdx = 0; rowIdx < rows.size(); ++rowIdx) {
analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols, slotToIndex);
analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols, realTargetColumnNames);
}

// clear these 2 structures, rebuild them using VALUES exprs
Expand All @@ -640,7 +621,7 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
// `selectStmt.getResultExprs().clear();` will clear the `rows` too, causing
// error.
rows.add(Lists.newArrayList(selectStmt.getResultExprs()));
analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols, slotToIndex);
analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols, realTargetColumnNames);
// rows may be changed in analyzeRow(), so rebuild the result exprs
selectStmt.getResultExprs().clear();
for (Expr expr : rows.get(0)) {
Expand All @@ -651,6 +632,8 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
// INSERT INTO SELECT ... FROM tbl
if (!origColIdxsForExtendCols.isEmpty()) {
// extend the result expr by duplicating the related exprs
Map<String, Expr> slotToIndex = buildSlotToIndex(queryStmt.getResultExprs(), realTargetColumnNames,
analyzer);
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
if (entry.second == null) {
queryStmt.getResultExprs().add(queryStmt.getResultExprs().get(entry.first));
Expand Down Expand Up @@ -680,6 +663,8 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
// expand colLabels in QueryStmt
if (!origColIdxsForExtendCols.isEmpty()) {
if (queryStmt.getResultExprs().size() != queryStmt.getBaseTblResultExprs().size()) {
Map<String, Expr> slotToIndex = buildSlotToIndex(queryStmt.getBaseTblResultExprs(),
realTargetColumnNames, analyzer);
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
if (entry.second == null) {
queryStmt.getBaseTblResultExprs().add(queryStmt.getBaseTblResultExprs().get(entry.first));
Expand Down Expand Up @@ -718,8 +703,34 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
}
}

private void analyzeRow(Analyzer analyzer, List<Column> targetColumns, List<ArrayList<Expr>> rows,
int rowIdx, List<Pair<Integer, Column>> origColIdxsForExtendCols, Map<String, Expr> slotToIndex)
private Map<String, Expr> buildSlotToIndex(ArrayList<Expr> row, List<String> realTargetColumnNames,
Analyzer analyzer) throws AnalysisException {
Map<String, Expr> slotToIndex = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (int i = 0; i < row.size(); i++) {
Expr expr = row.get(i);
expr.analyze(analyzer);
if (expr instanceof DefaultValueExpr || expr instanceof StringLiteral
&& ((StringLiteral) expr).getValue().equals(SelectStmt.DEFAULT_VALUE)) {
continue;
}
expr.analyze(analyzer);
slotToIndex.put(realTargetColumnNames.get(i),
expr.checkTypeCompatibility(targetTable.getColumn(realTargetColumnNames.get(i)).getType()));
}
for (Column column : targetTable.getBaseSchema()) {
if (!slotToIndex.containsKey(column.getName())) {
if (column.getDefaultValue() == null) {
slotToIndex.put(column.getName(), new NullLiteral());
} else {
slotToIndex.put(column.getName(), new StringLiteral(column.getDefaultValue()));
}
}
}
return slotToIndex;
}

private void analyzeRow(Analyzer analyzer, List<Column> targetColumns, List<ArrayList<Expr>> rows, int rowIdx,
List<Pair<Integer, Column>> origColIdxsForExtendCols, List<String> realTargetColumnNames)
throws AnalysisException {
// 1. check number of fields if equal with first row
// targetColumns contains some shadow columns, which is added by system,
Expand All @@ -729,6 +740,8 @@ private void analyzeRow(Analyzer analyzer, List<Column> targetColumns, List<Arra
}

ArrayList<Expr> row = rows.get(rowIdx);
Map<String, Expr> slotToIndex = buildSlotToIndex(row, realTargetColumnNames, analyzer);

if (!origColIdxsForExtendCols.isEmpty()) {
/**
* we should extend the row for shadow columns.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_star --
1 1 1 2020-02-02 1
1 2 2 2020-02-02 1

-- !select_mv --
1 1
2 1

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite ("test_insert_multi") {

sql """ DROP TABLE IF EXISTS sales_records; """

sql """
create table sales_records(record_id int, seller_id int, store_id int, sale_date date, sale_amt bigint) distributed by hash(record_id) properties("replication_num" = "1");
"""

createMV ("create materialized view store_amt as select store_id, sum(sale_amt) from sales_records group by store_id;")

sql """insert into sales_records values(1,1,1,"2020-02-02",1),(1,2,2,"2020-02-02",1);"""

qt_select_star "select * from sales_records order by 1,2;"

explain {
sql(" SELECT store_id, sum(sale_amt) FROM sales_records GROUP BY store_id order by 1;")
contains "(store_amt)"
}
qt_select_mv " SELECT store_id, sum(sale_amt) FROM sales_records GROUP BY store_id order by 1;"
}

0 comments on commit 17a92ef

Please sign in to comment.