Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions fe/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void start() {
clusterHandler.start();
}

public void processCreateMaterializedView(CreateMaterializedViewStmt stmt) throws DdlException, AnalysisException {
public void processCreateMaterializedView(CreateMaterializedViewStmt stmt)
throws DdlException, AnalysisException {
String tableName = stmt.getBaseIndexName();
// check db
String dbName = stmt.getDBName();
Expand All @@ -102,7 +103,8 @@ public void processCreateMaterializedView(CreateMaterializedViewStmt stmt) throw
OlapTable olapTable = (OlapTable) table;
olapTable.checkStableAndNormal(db.getClusterName());

((MaterializedViewHandler)materializedViewHandler).processCreateMaterializedView(stmt, db, olapTable);
((MaterializedViewHandler)materializedViewHandler).processCreateMaterializedView(stmt, db,
olapTable);
} finally {
db.writeUnlock();
}
Expand Down
36 changes: 27 additions & 9 deletions fe/src/main/java/org/apache/doris/alter/AlterJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.base.Preconditions;
import com.google.gson.annotations.SerializedName;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -59,17 +62,27 @@ public enum JobType {
ROLLUP, SCHEMA_CHANGE
}

@SerializedName(value = "type")
protected JobType type;
@SerializedName(value = "jobId")
protected long jobId;
@SerializedName(value = "jobState")
protected JobState jobState;

@SerializedName(value = "dbId")
protected long dbId;
@SerializedName(value = "tableId")
protected long tableId;
@SerializedName(value = "tableName")
protected String tableName;

@SerializedName(value = "errMsg")
protected String errMsg = "";
@SerializedName(value = "createTimeMs")
protected long createTimeMs = -1;
@SerializedName(value = "finishedTimeMs")
protected long finishedTimeMs = -1;
@SerializedName(value = "timeoutMs")
protected long timeoutMs = -1;

public AlterJobV2(long jobId, JobType jobType, long dbId, long tableId, String tableName, long timeoutMs) {
Expand Down Expand Up @@ -220,15 +233,20 @@ protected boolean checkTableStable(Database db) throws AlterCancelException {
public abstract void replay(AlterJobV2 replayedJob);

public static AlterJobV2 read(DataInput in) throws IOException {
JobType type = JobType.valueOf(Text.readString(in));
switch (type) {
case ROLLUP:
return RollupJobV2.read(in);
case SCHEMA_CHANGE:
return SchemaChangeJobV2.read(in);
default:
Preconditions.checkState(false);
return null;
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_86) {
JobType type = JobType.valueOf(Text.readString(in));
switch (type) {
case ROLLUP:
return RollupJobV2.read(in);
case SCHEMA_CHANGE:
return SchemaChangeJobV2.read(in);
default:
Preconditions.checkState(false);
return null;
}
} else {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, AlterJobV2.class);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;

Expand Down Expand Up @@ -89,7 +90,6 @@
public class MaterializedViewHandler extends AlterHandler {
private static final Logger LOG = LogManager.getLogger(MaterializedViewHandler.class);
public static final String NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX = "__v2_";
public static final String MATERIALIZED_VIEW_NAME_PRFIX = "__doris_materialized_view_";

public MaterializedViewHandler() {
super("materialized view");
Expand Down Expand Up @@ -199,7 +199,7 @@ public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause

// Step2: create mv job
RollupJobV2 rollupJobV2 = createMaterializedViewJob(mvIndexName, baseIndexName, mvColumns, addMVClause
.getProperties(), olapTable, db, baseIndexId, addMVClause.getMVKeysType());
.getProperties(), olapTable, db, baseIndexId, addMVClause.getMVKeysType(), addMVClause.getOrigStmt());

addAlterJobV2(rollupJobV2);

Expand Down Expand Up @@ -263,7 +263,7 @@ public void processBatchAddRollup(List<AlterClause> alterClauses, Database db, O

// step 3 create rollup job
RollupJobV2 alterJobV2 = createMaterializedViewJob(rollupIndexName, baseIndexName, rollupSchema, addRollupClause.getProperties(),
olapTable, db, baseIndexId, olapTable.getKeysType());
olapTable, db, baseIndexId, olapTable.getKeysType(), null);

rollupNameJobMap.put(addRollupClause.getRollupName(), alterJobV2);
logJobIdSet.add(alterJobV2.getJobId());
Expand Down Expand Up @@ -312,7 +312,7 @@ public void processBatchAddRollup(List<AlterClause> alterClauses, Database db, O
*/
private RollupJobV2 createMaterializedViewJob(String mvName, String baseIndexName,
List<Column> mvColumns, Map<String, String> properties, OlapTable
olapTable, Database db, long baseIndexId, KeysType mvKeysType)
olapTable, Database db, long baseIndexId, KeysType mvKeysType, OriginStatement origStmt)
throws DdlException, AnalysisException {
if (mvKeysType == null) {
// assign rollup index's key type, same as base index's
Expand All @@ -336,7 +336,7 @@ private RollupJobV2 createMaterializedViewJob(String mvName, String baseIndexNam
RollupJobV2 mvJob = new RollupJobV2(jobId, dbId, tableId, olapTable.getName(), timeoutMs,
baseIndexId, mvIndexId, baseIndexName, mvName,
mvColumns, baseSchemaHash, mvSchemaHash,
mvKeysType, mvShortKeyColumnCount);
mvKeysType, mvShortKeyColumnCount, origStmt);
String newStorageFormatIndexName = NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + olapTable.getName();
if (mvName.equals(newStorageFormatIndexName)) {
mvJob.setStorageFormat(TStorageFormat.V2);
Expand Down Expand Up @@ -464,10 +464,8 @@ private List<Column> checkAndPrepareMaterializedView(CreateMaterializedViewStmt
if (mvColumnItem.getDefineExpr() != null) {
if (mvAggregationType.equals(AggregateType.BITMAP_UNION)) {
newMVColumn.setType(Type.BITMAP);
newMVColumn.setName(MATERIALIZED_VIEW_NAME_PRFIX + "bitmap_" + baseColumn.getName());
} else if (mvAggregationType.equals(AggregateType.HLL_UNION)){
newMVColumn.setType(Type.HLL);
newMVColumn.setName(MATERIALIZED_VIEW_NAME_PRFIX + "hll_" + baseColumn.getName());
} else {
throw new DdlException("The define expr of column is only support bitmap_union or hll_union");
}
Expand Down
Loading