Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 68 additions & 21 deletions fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.PrivPredicate;
Expand Down Expand Up @@ -427,25 +428,46 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {

/*
* When doing schema change, there may be some shadow columns. we should add
* them to the end of targetColumns. And use 'origColIdxsForShadowCols' to save
* them to the end of targetColumns. And use 'origColIdxsForExtendCols' to save
* the index of column in 'targetColumns' which the shadow column related to.
* eg: origin targetColumns: (A,B,C), shadow column: __doris_shadow_B after
* processing, targetColumns: (A, B, C, __doris_shadow_B), and
* origColIdxsForShadowCols has 1 element: "1", which is the index of column B
* origColIdxsForExtendCols has 1 element: "1", which is the index of column B
* in targetColumns.
*
* Rule A: If the column which the shadow column related to is not mentioned,
* then do not add the shadow column to targetColumns. They will be filled by
* null or default value when loading.
*
* When table have materialized view, there may be some materialized view columns.
* we should add them to the end of targetColumns.
* eg: origin targetColumns: (A,B,C), shadow column: mv_bitmap_union_C
* after processing, targetColumns: (A, B, C, mv_bitmap_union_C), and
* origColIdx2MVColumn has 1 element: "2, mv_bitmap_union_C"
* will be used in as a mapping from queryStmt.getResultExprs() to targetColumns define expr
*/
List<Integer> origColIdxsForShadowCols = Lists.newArrayList();
List<Pair<Integer, Column>> origColIdxsForExtendCols = Lists.newArrayList();
for (Column column : targetTable.getFullSchema()) {
if (column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) {
String origName = Column.removeNamePrefix(column.getName());
for (int i = 0; i < targetColumns.size(); i++) {
if (targetColumns.get(i).nameEquals(origName, false)) {
// Rule A
origColIdxsForShadowCols.add(i);
origColIdxsForExtendCols.add(new Pair<>(i, null));
targetColumns.add(column);
break;
}
}
}
if (column.isNameWithPrefix(CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PRFIX)) {
SlotRef refColumn = column.getRefColumn();
if (refColumn == null) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR, column.getName(), targetTable.getName());
}
String origName = refColumn.getColumnName();
for (int originColumnIdx = 0; originColumnIdx < targetColumns.size(); originColumnIdx++) {
if (targetColumns.get(originColumnIdx).nameEquals(origName, false)) {
origColIdxsForExtendCols.add(new Pair<>(originColumnIdx, column));
targetColumns.add(column);
break;
}
Expand All @@ -472,7 +494,7 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
// INSERT INTO VALUES(...)
List<ArrayList<Expr>> rows = selectStmt.getValueList().getRows();
for (int rowIdx = 0; rowIdx < rows.size(); ++rowIdx) {
analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForShadowCols);
analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols);
}

// clear these 2 structures, rebuild them using VALUES exprs
Expand All @@ -487,7 +509,7 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
// INSERT INTO SELECT 1,2,3 ...
List<ArrayList<Expr>> rows = Lists.newArrayList();
rows.add(selectStmt.getResultExprs());
analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForShadowCols);
analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols);
// rows may be changed in analyzeRow(), so rebuild the result exprs
selectStmt.getResultExprs().clear();
for (Expr expr : rows.get(0)) {
Expand All @@ -497,12 +519,22 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
isStreaming = true;
} else {
// INSERT INTO SELECT ... FROM tbl
if (!origColIdxsForShadowCols.isEmpty()) {
if (!origColIdxsForExtendCols.isEmpty()) {
// extend the result expr by duplicating the related exprs
for (Integer idx : origColIdxsForShadowCols) {
queryStmt.getResultExprs().add(queryStmt.getResultExprs().get(idx));
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
if (entry.second == null) {
queryStmt.getResultExprs().add(queryStmt.getResultExprs().get(entry.first));
} else {
//substitute define expr slot with select statement result expr
ExprSubstitutionMap smap = new ExprSubstitutionMap();
smap.getLhs().add(entry.second.getRefColumn());
smap.getRhs().add(queryStmt.getResultExprs().get(entry.first));
Expr e = Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()), smap, analyzer, false).get(0);
queryStmt.getResultExprs().add(e);
}
}
}

// check compatibility
for (int i = 0; i < targetColumns.size(); ++i) {
Column column = targetColumns.get(i);
Expand All @@ -518,17 +550,26 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
}
}

// expand baseTblResultExprs and colLabels in QueryStmt
if (!origColIdxsForShadowCols.isEmpty()) {
// expand colLabels in QueryStmt
if (!origColIdxsForExtendCols.isEmpty()) {
if (queryStmt.getResultExprs().size() != queryStmt.getBaseTblResultExprs().size()) {
for (Integer idx : origColIdxsForShadowCols) {
queryStmt.getBaseTblResultExprs().add(queryStmt.getBaseTblResultExprs().get(idx));
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
if (entry.second == null) {
queryStmt.getBaseTblResultExprs().add(queryStmt.getBaseTblResultExprs().get(entry.first));
} else {
//substitute define expr slot with select statement result expr
ExprSubstitutionMap smap = new ExprSubstitutionMap();
smap.getLhs().add(entry.second.getRefColumn());
smap.getRhs().add(queryStmt.getResultExprs().get(entry.first));
Expr e = Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()), smap, analyzer, false).get(0);
queryStmt.getBaseTblResultExprs().add(e);
}
}
}

if (queryStmt.getResultExprs().size() != queryStmt.getColLabels().size()) {
for (Integer idx : origColIdxsForShadowCols) {
queryStmt.getColLabels().add(queryStmt.getColLabels().get(idx));
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
queryStmt.getColLabels().add(queryStmt.getColLabels().get(entry.first));
}
}
}
Expand All @@ -547,16 +588,16 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
}

private void analyzeRow(Analyzer analyzer, List<Column> targetColumns, List<ArrayList<Expr>> rows,
int rowIdx, List<Integer> origColIdxsForShadowCols) throws AnalysisException {
int rowIdx, List<Pair<Integer, Column>> origColIdxsForExtendCols) throws AnalysisException {
// 1. check number of fields if equal with first row
// targetColumns contains some shadow columns, which is added by system,
// so we should minus this
if (rows.get(rowIdx).size() != targetColumns.size() - origColIdxsForShadowCols.size()) {
if (rows.get(rowIdx).size() != targetColumns.size() - origColIdxsForExtendCols.size()) {
throw new AnalysisException("Column count doesn't match value count at row " + (rowIdx + 1));
}

ArrayList<Expr> row = rows.get(rowIdx);
if (!origColIdxsForShadowCols.isEmpty()) {
if (!origColIdxsForExtendCols.isEmpty()) {
/**
* we should extends the row for shadow columns.
* eg:
Expand All @@ -566,14 +607,20 @@ private void analyzeRow(Analyzer analyzer, List<Column> targetColumns, List<Arra
ArrayList<Expr> extentedRow = Lists.newArrayList();
extentedRow.addAll(row);

for (Integer idx : origColIdxsForShadowCols) {
extentedRow.add(extentedRow.get(idx));
for (Pair<Integer, Column> entry : origColIdxsForExtendCols) {
if (entry == null) {
extentedRow.add(extentedRow.get(entry.first));
} else {
ExprSubstitutionMap smap = new ExprSubstitutionMap();
smap.getLhs().add(entry.second.getRefColumn());
smap.getRhs().add(extentedRow.get(entry.first));
extentedRow.add(Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()), smap, analyzer, false).get(0));
}
}

row = extentedRow;
rows.set(rowIdx, row);
}

// check the compatibility of expr in row and column in targetColumns
for (int i = 0; i < row.size(); ++i) {
Expr expr = row.get(i);
Expand Down
15 changes: 15 additions & 0 deletions fe/src/main/java/org/apache/doris/catalog/Column.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.doris.catalog;

import com.google.common.base.Preconditions;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
Expand All @@ -37,6 +39,8 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* This class represents the column-related metadata.
Expand Down Expand Up @@ -334,6 +338,17 @@ public void setDefineExpr(Expr expr) {
defineExpr = expr;
}

public SlotRef getRefColumn() {
List<Expr> slots = new ArrayList<>();
if (defineExpr == null) {
return null;
} else {
defineExpr.collect(SlotRef.class, slots);
Preconditions.checkArgument(slots.size() == 1);
return (SlotRef) slots.get(0);
}
}

public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("`").append(name).append("` ");
Expand Down
40 changes: 39 additions & 1 deletion fe/src/main/java/org/apache/doris/load/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.ColumnSeparator;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.DeleteStmt;
Expand Down Expand Up @@ -1054,7 +1055,20 @@ public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
slotDescByName.put(realColName, slotDesc);
}
}
LOG.debug("slotDescByName: {}, exprsByName: {}", slotDescByName, exprsByName);
/*
* The extension column of the materialized view is added to the expression evaluation of load
* To avoid nested expressions. eg : column(a, tmp_c, c = expr(tmp_c)) ,
* __doris_materialized_view_bitmap_union_c need be analyzed after exprsByName
* So the columns of the materialized view are stored separately here
*/
Map<String, Expr> mvDefineExpr = Maps.newHashMap();
for (Column column : tbl.getFullSchema()) {
if (column.getDefineExpr() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that this is correctcolumn(a, tmp_c) set(b=f(c), c=b)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that this is correctcolumn(a, tmp_c) set(b=f(c), c=b)

Fix done

mvDefineExpr.put(column.getName(), column.getDefineExpr());
}
}

LOG.debug("slotDescByName: {}, exprsByName: {}, mvDefineExpr: {}", slotDescByName, exprsByName, mvDefineExpr);

// analyze all exprs
for (Map.Entry<String, Expr> entry : exprsByName.entrySet()) {
Expand Down Expand Up @@ -1083,6 +1097,30 @@ public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
}
exprsByName.put(entry.getKey(), expr);
}

for (Map.Entry<String, Expr> entry : mvDefineExpr.entrySet()) {
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> slots = Lists.newArrayList();
entry.getValue().collect(SlotRef.class, slots);
for (SlotRef slot : slots) {
if (slotDescByName.get(slot.getColumnName()) != null) {
smap.getLhs().add(slot);
smap.getRhs().add(new CastExpr(tbl.getColumn(slot.getColumnName()).getType(),
new SlotRef(slotDescByName.get(slot.getColumnName()))));
} else if (exprsByName.get(slot.getColumnName()) != null) {
smap.getLhs().add(slot);
smap.getRhs().add(new CastExpr(tbl.getColumn(slot.getColumnName()).getType(),
exprsByName.get(slot.getColumnName())));
} else {
throw new UserException("unknown reference column, column=" + entry.getKey()
+ ", reference=" + slot.getColumnName());
}
}
Expr expr = entry.getValue().clone(smap);
expr.analyze(analyzer);

exprsByName.put(entry.getKey(), expr);
}
LOG.debug("after init column, exprMap: {}", exprsByName);
}

Expand Down
Loading