Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions fe/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,10 @@ public KeysType getKeysType() {
return keysType;
}

public KeysType getKeysTypeByIndexId(long indexId) {
return indexIdToMeta.get(indexId).getKeysType();
}

public PartitionInfo getPartitionInfo() {
return partitionInfo;
}
Expand Down
6 changes: 6 additions & 0 deletions fe/src/main/java/org/apache/doris/common/Pair.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@

package org.apache.doris.common;

import com.google.gson.annotations.SerializedName;

import java.util.Comparator;

/**
* The equivalent of C++'s std::pair<>.
*
* Notice: When using Pair for persistence, users need to guarantee that F and S can be serialized through Gson
*/
public class Pair<F, S> {
public static PairComparator<Pair<?, Comparable>> PAIR_VALUE_COMPARATOR = new PairComparator<>();

@SerializedName(value = "first")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'am not sure this is ok, cause there is no guarantee that the F and S object can also be serialized by GSON

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users guarantee this when use Pair class?like Map and List.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked, this is not work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a comment.
When using Pair for persistence, users need to guarantee that F and S can be serialized through Gson

public F first;
@SerializedName(value = "second")
public S second;

public Pair(F first, S second) {
Expand Down
412 changes: 384 additions & 28 deletions fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions fe/src/main/java/org/apache/doris/journal/JournalEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.master.Checkpoint;
import org.apache.doris.mysql.privilege.UserPropertyInfo;
Expand Down Expand Up @@ -499,6 +500,11 @@ public void readFields(DataInput in) throws IOException {
isRead = true;
break;
}
case OperationType.OP_UPDATE_LOAD_JOB: {
data = LoadJobStateUpdateInfo.read(in);
isRead = true;
break;
}
case OperationType.OP_CREATE_RESOURCE: {
data = Resource.read(in);
isRead = true;
Expand Down
128 changes: 80 additions & 48 deletions fe/src/main/java/org/apache/doris/load/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -864,21 +864,91 @@ public static void checkAndCreateSource(Database db, DataDescription dataDescrip
}
}

/**
* When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in
* their names. These columns are invisible to user, but we need to generate data for these columns.
* So we add column mappings for these column.
* eg1:
* base schema is (A, B, C), and B is under schema change, so there will be a shadow column: '__doris_shadow_B'
* So the final column mapping should looks like: (A, B, C, __doris_shadow_B = substitute(B));
*/
public static List<ImportColumnDesc> getSchemaChangeShadowColumnDesc(Table tbl, Map<String, Expr> columnExprMap) {
List<ImportColumnDesc> shadowColumnDescs = Lists.newArrayList();
for (Column column : tbl.getFullSchema()) {
if (!column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) {
continue;
}

String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX);
if (columnExprMap.containsKey(originCol)) {
Expr mappingExpr = columnExprMap.get(originCol);
if (mappingExpr != null) {
/*
* eg:
* (A, C) SET (B = func(xx))
* ->
* (A, C) SET (B = func(xx), __doris_shadow_B = func(xx))
*/
ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), mappingExpr);
shadowColumnDescs.add(importColumnDesc);
} else {
/*
* eg:
* (A, B, C)
* ->
* (A, B, C) SET (__doris_shadow_B = B)
*/
ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(),
new SlotRef(null, originCol));
shadowColumnDescs.add(importColumnDesc);
}
} else {
/*
* There is a case that if user does not specify the related origin column, eg:
* COLUMNS (A, C), and B is not specified, but B is being modified so there is a shadow column '__doris_shadow_B'.
* We can not just add a mapping function "__doris_shadow_B = substitute(B)", because Doris can not find column B.
* In this case, __doris_shadow_B can use its default value, so no need to add it to column mapping
*/
// do nothing
}
}
return shadowColumnDescs;
}

/*
* used for spark load job
* not init slot desc and analyze exprs
*/
public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
Map<String, Pair<String, List<String>>> columnToHadoopFunction) throws UserException {
initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, false);
}

/*
* This function should be used for broker load v2 and stream load.
* And it must be called in same db lock when planing.
*/
public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
Map<String, Pair<String, List<String>>> columnToHadoopFunction,
Map<String, Expr> exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc,
Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params) throws UserException {
initColumns(tbl, columnExprs, columnToHadoopFunction, exprsByName, analyzer,
srcTupleDesc, slotDescByName, params, true);
}

/*
* This function will do followings:
* 1. fill the column exprs if user does not specify any column or column mapping.
* 2. For not specified columns, check if they have default value.
* 3. Add any shadow columns if have.
* 4. validate hadoop functions
* 5. init slot descs and expr map for load plan
*
* This function should be used for broker load v2 and stream load.
* And it must be called in same db lock when planing.
*/
public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
Map<String, Pair<String, List<String>>> columnToHadoopFunction,
Map<String, Expr> exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc,
Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params) throws UserException {
Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params,
boolean needInitSlotAndAnalyzeExprs) throws UserException {
// check mapping column exist in schema
// !! all column mappings are in columnExprs !!
for (ImportColumnDesc importColumnDesc : columnExprs) {
Expand Down Expand Up @@ -925,50 +995,8 @@ public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
throw new DdlException("Column has no default value. column: " + columnName);
}

// When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in
// their names. These columns are invisible to user, but we need to generate data for these columns.
// So we add column mappings for these column.
// eg1:
// base schema is (A, B, C), and B is under schema change, so there will be a shadow column: '__doris_shadow_B'
// So the final column mapping should looks like: (A, B, C, __doris_shadow_B = substitute(B));
for (Column column : tbl.getFullSchema()) {
if (!column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) {
continue;
}

String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX);
if (columnExprMap.containsKey(originCol)) {
Expr mappingExpr = columnExprMap.get(originCol);
if (mappingExpr != null) {
/*
* eg:
* (A, C) SET (B = func(xx))
* ->
* (A, C) SET (B = func(xx), __doris_shadow_B = func(xx))
*/
ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), mappingExpr);
copiedColumnExprs.add(importColumnDesc);
} else {
/*
* eg:
* (A, B, C)
* ->
* (A, B, C) SET (__doris_shadow_B = B)
*/
ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(),
new SlotRef(null, originCol));
copiedColumnExprs.add(importColumnDesc);
}
} else {
/*
* There is a case that if user does not specify the related origin column, eg:
* COLUMNS (A, C), and B is not specified, but B is being modified so there is a shadow column '__doris_shadow_B'.
* We can not just add a mapping function "__doris_shadow_B = substitute(B)", because Doris can not find column B.
* In this case, __doris_shadow_B can use its default value, so no need to add it to column mapping
*/
// do nothing
}
}
// get shadow column desc when table schema change
copiedColumnExprs.addAll(getSchemaChangeShadowColumnDesc(tbl, columnExprMap));

// validate hadoop functions
if (columnToHadoopFunction != null) {
Expand All @@ -991,6 +1019,10 @@ public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
}
}

if (!needInitSlotAndAnalyzeExprs) {
return;
}

// init slot desc add expr map, also transform hadoop functions
for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
// make column name case match with real column name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private void getAllFileStatus() throws UserException {
long groupFileSize = 0;
List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
for (String path : fileGroup.getFilePaths()) {
BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses);
BrokerUtil.parseFile(path, brokerDesc, fileStatuses);
}
fileStatusList.add(fileStatuses);
for (TBrokerFileStatus fstatus : fileStatuses) {
Expand Down
Loading