From a0dadacffef022bb512af468f97fa86864d25b36 Mon Sep 17 00:00:00 2001 From: Lei Zhang <1091517373@qq.com> Date: Sun, 3 Jul 2022 22:54:49 +0800 Subject: [PATCH] [refactor](schema change) refact fe light schema change. --- .../doris/alter/SchemaChangeHandler.java | 180 +++++++++++++++++- .../org/apache/doris/catalog/Catalog.java | 179 ----------------- .../org/apache/doris/persist/EditLog.java | 2 +- .../apache/doris/persist/OperationType.java | 5 +- 4 files changed, 182 insertions(+), 184 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 9de68d4d87ab83..27041e130b3078 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -53,6 +53,7 @@ import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.AnalysisException; @@ -70,6 +71,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.RemoveAlterJobV2OperationLog; +import org.apache.doris.persist.TableAddOrDropColumnsInfo; import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; @@ -1727,8 +1729,7 @@ public int getAsInt() { if (ligthSchemaChange) { long jobId = Catalog.getCurrentCatalog().getNextId(); //for schema change add/drop value column optimize, direct modify table meta. - Catalog.getCurrentCatalog() - .modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, jobId, false); + modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, jobId, false); return; } else { createJob(db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes); @@ -2084,4 +2085,179 @@ public void replayAlterJobV2(AlterJobV2 alterJob) { } super.replayAlterJobV2(alterJob); } + + // the invoker should keep table's write lock + public void modifyTableAddOrDropColumns(Database db, OlapTable olapTable, + Map> indexSchemaMap, + List indexes, long jobId, boolean isReplay) throws DdlException { + + LOG.debug("indexSchemaMap:{}, indexes:{}", indexSchemaMap, indexes); + if (olapTable.getState() == OlapTableState.ROLLUP) { + throw new DdlException("Table[" + olapTable.getName() + "]'s is doing ROLLUP job"); + } + + // for now table's state can only be NORMAL + Preconditions.checkState(olapTable.getState() == OlapTableState.NORMAL, olapTable.getState().name()); + + // for bitmapIndex + boolean hasIndexChange = false; + Set newSet = new HashSet<>(indexes); + Set oriSet = new HashSet<>(olapTable.getIndexes()); + if (!newSet.equals(oriSet)) { + hasIndexChange = true; + } + + // begin checking each table + // ATTN: DO NOT change any meta in this loop + Map> changedIndexIdToSchema = Maps.newHashMap(); + for (Long alterIndexId : indexSchemaMap.keySet()) { + // Must get all columns including invisible columns. + // Because in alter process, all columns must be considered. + List alterSchema = indexSchemaMap.get(alterIndexId); + + LOG.debug("index[{}] is changed. start checking...", alterIndexId); + // 1. check order: a) has key; b) value after key + boolean meetValue = false; + boolean hasKey = false; + for (Column column : alterSchema) { + if (column.isKey() && meetValue) { + throw new DdlException( + "Invalid column order. value should be after key. index[" + olapTable.getIndexNameById( + alterIndexId) + "]"); + } + if (!column.isKey()) { + meetValue = true; + } else { + hasKey = true; + } + } + if (!hasKey) { + throw new DdlException("No key column left. index[" + olapTable.getIndexNameById(alterIndexId) + "]"); + } + + // 2. check partition key + PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) { + List partitionColumns = partitionInfo.getPartitionColumns(); + for (Column partitionCol : partitionColumns) { + boolean found = false; + for (Column alterColumn : alterSchema) { + if (alterColumn.nameEquals(partitionCol.getName(), true)) { + found = true; + break; + } + } // end for alterColumns + + if (!found && alterIndexId == olapTable.getBaseIndexId()) { + // 2.1 partition column cannot be deleted. + throw new DdlException( + "Partition column[" + partitionCol.getName() + "] cannot be dropped. index[" + + olapTable.getIndexNameById(alterIndexId) + "]"); + } + } // end for partitionColumns + } + + // 3. check distribution key: + DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); + if (distributionInfo.getType() == DistributionInfoType.HASH) { + List distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); + for (Column distributionCol : distributionColumns) { + boolean found = false; + for (Column alterColumn : alterSchema) { + if (alterColumn.nameEquals(distributionCol.getName(), true)) { + found = true; + break; + } + } // end for alterColumns + if (!found && alterIndexId == olapTable.getBaseIndexId()) { + // 2.2 distribution column cannot be deleted. + throw new DdlException( + "Distribution column[" + distributionCol.getName() + "] cannot be dropped. index[" + + olapTable.getIndexNameById(alterIndexId) + "]"); + } + } // end for distributionCols + } + + // 5. store the changed columns for edit log + changedIndexIdToSchema.put(alterIndexId, alterSchema); + + LOG.debug("schema change[{}-{}-{}] check pass.", db.getId(), olapTable.getId(), alterIndexId); + } // end for indices + + if (changedIndexIdToSchema.isEmpty() && !hasIndexChange) { + throw new DdlException("Nothing is changed. please check your alter stmt."); + } + + //update base index schema + long baseIndexId = olapTable.getBaseIndexId(); + List indexIds = new ArrayList(); + indexIds.add(baseIndexId); + indexIds.addAll(olapTable.getIndexIdListExceptBaseIndex()); + for (int i = 0; i < indexIds.size(); i++) { + List indexSchema = indexSchemaMap.get(indexIds.get(i)); + MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(indexIds.get(i)); + currentIndexMeta.setSchema(indexSchema); + + int currentSchemaVersion = currentIndexMeta.getSchemaVersion(); + int newSchemaVersion = currentSchemaVersion + 1; + currentIndexMeta.setSchemaVersion(newSchemaVersion); + // generate schema hash for new index has to generate a new schema hash not equal to current schema hash + int currentSchemaHash = currentIndexMeta.getSchemaHash(); + int newSchemaHash = Util.generateSchemaHash(); + while (currentSchemaHash == newSchemaHash) { + newSchemaHash = Util.generateSchemaHash(); + } + currentIndexMeta.setSchemaHash(newSchemaHash); + } + olapTable.setIndexes(indexes); + olapTable.rebuildFullSchema(); + + //update max column unique id + int maxColUniqueId = olapTable.getMaxColUniqueId(); + for (Column column : indexSchemaMap.get(olapTable.getBaseIndexId())) { + if (column.getUniqueId() > maxColUniqueId) { + maxColUniqueId = column.getUniqueId(); + } + } + olapTable.setMaxColUniqueId(maxColUniqueId); + + if (!isReplay) { + TableAddOrDropColumnsInfo info = new TableAddOrDropColumnsInfo(db.getId(), olapTable.getId(), + indexSchemaMap, indexes, jobId); + LOG.debug("logModifyTableAddOrDropColumns info:{}", info); + Catalog.getCurrentCatalog().getEditLog().logModifyTableAddOrDropColumns(info); + } + + //for compatibility, we need create a finished state schema change job v2 + + SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(jobId, db.getId(), olapTable.getId(), + olapTable.getName(), 1000); + schemaChangeJob.setJobState(AlterJobV2.JobState.FINISHED); + schemaChangeJob.setFinishedTimeMs(System.currentTimeMillis()); + this.addAlterJobV2(schemaChangeJob); + + LOG.info("finished modify table's add or drop columns. table: {}, is replay: {}", olapTable.getName(), + isReplay); + } + + public void replayModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info) throws MetaNotFoundException { + LOG.debug("info:{}", info); + long dbId = info.getDbId(); + long tableId = info.getTableId(); + Map> indexSchemaMap = info.getIndexSchemaMap(); + List indexes = info.getIndexes(); + long jobId = info.getJobId(); + + Database db = Catalog.getCurrentCatalog().getInternalDataSource().getDbOrMetaException(dbId); + OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP); + olapTable.writeLock(); + try { + modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, indexes, jobId, true); + } catch (DdlException e) { + // should not happen + LOG.warn("failed to replay modify table add or drop columns", e); + } finally { + olapTable.writeUnlock(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 27e7fd892e6cd8..da02033f7187ab 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -22,7 +22,6 @@ import org.apache.doris.alter.AlterJobV2.JobType; import org.apache.doris.alter.MaterializedViewHandler; import org.apache.doris.alter.SchemaChangeHandler; -import org.apache.doris.alter.SchemaChangeJobV2; import org.apache.doris.alter.SystemHandler; import org.apache.doris.analysis.AddPartitionClause; import org.apache.doris.analysis.AdminCheckTabletsStmt; @@ -184,7 +183,6 @@ import org.apache.doris.persist.Storage; import org.apache.doris.persist.StorageInfo; import org.apache.doris.persist.StorageInfoV2; -import org.apache.doris.persist.TableAddOrDropColumnsInfo; import org.apache.doris.persist.TableInfo; import org.apache.doris.persist.TablePropertyInfo; import org.apache.doris.persist.TruncateTableInfo; @@ -251,8 +249,6 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -4919,181 +4915,6 @@ private static void addTableComment(Table table, StringBuilder sb) { } } - // the invoker should keep table's write lock - public void modifyTableAddOrDropColumns(Database db, OlapTable olapTable, - Map> indexSchemaMap, - List indexes, long jobId, boolean isReplay) throws DdlException { - - LOG.debug("indexSchemaMap:{}, indexes:{}", indexSchemaMap, indexes); - if (olapTable.getState() == OlapTableState.ROLLUP) { - throw new DdlException("Table[" + olapTable.getName() + "]'s is doing ROLLUP job"); - } - - // for now table's state can only be NORMAL - Preconditions.checkState(olapTable.getState() == OlapTableState.NORMAL, olapTable.getState().name()); - - // for bitmapIndex - boolean hasIndexChange = false; - Set newSet = new HashSet<>(indexes); - Set oriSet = new HashSet<>(olapTable.getIndexes()); - if (!newSet.equals(oriSet)) { - hasIndexChange = true; - } - - // begin checking each table - // ATTN: DO NOT change any meta in this loop - Map> changedIndexIdToSchema = Maps.newHashMap(); - for (Long alterIndexId : indexSchemaMap.keySet()) { - // Must get all columns including invisible columns. - // Because in alter process, all columns must be considered. - List alterSchema = indexSchemaMap.get(alterIndexId); - - LOG.debug("index[{}] is changed. start checking...", alterIndexId); - // 1. check order: a) has key; b) value after key - boolean meetValue = false; - boolean hasKey = false; - for (Column column : alterSchema) { - if (column.isKey() && meetValue) { - throw new DdlException( - "Invalid column order. value should be after key. index[" + olapTable.getIndexNameById( - alterIndexId) + "]"); - } - if (!column.isKey()) { - meetValue = true; - } else { - hasKey = true; - } - } - if (!hasKey) { - throw new DdlException("No key column left. index[" + olapTable.getIndexNameById(alterIndexId) + "]"); - } - - // 2. check partition key - PartitionInfo partitionInfo = olapTable.getPartitionInfo(); - if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) { - List partitionColumns = partitionInfo.getPartitionColumns(); - for (Column partitionCol : partitionColumns) { - boolean found = false; - for (Column alterColumn : alterSchema) { - if (alterColumn.nameEquals(partitionCol.getName(), true)) { - found = true; - break; - } - } // end for alterColumns - - if (!found && alterIndexId == olapTable.getBaseIndexId()) { - // 2.1 partition column cannot be deleted. - throw new DdlException( - "Partition column[" + partitionCol.getName() + "] cannot be dropped. index[" - + olapTable.getIndexNameById(alterIndexId) + "]"); - } - } // end for partitionColumns - } - - // 3. check distribution key: - DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); - if (distributionInfo.getType() == DistributionInfoType.HASH) { - List distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns(); - for (Column distributionCol : distributionColumns) { - boolean found = false; - for (Column alterColumn : alterSchema) { - if (alterColumn.nameEquals(distributionCol.getName(), true)) { - found = true; - break; - } - } // end for alterColumns - if (!found && alterIndexId == olapTable.getBaseIndexId()) { - // 2.2 distribution column cannot be deleted. - throw new DdlException( - "Distribution column[" + distributionCol.getName() + "] cannot be dropped. index[" - + olapTable.getIndexNameById(alterIndexId) + "]"); - } - } // end for distributionCols - } - - // 5. store the changed columns for edit log - changedIndexIdToSchema.put(alterIndexId, alterSchema); - - LOG.debug("schema change[{}-{}-{}] check pass.", db.getId(), olapTable.getId(), alterIndexId); - } // end for indices - - if (changedIndexIdToSchema.isEmpty() && !hasIndexChange) { - throw new DdlException("Nothing is changed. please check your alter stmt."); - } - - //update base index schema - long baseIndexId = olapTable.getBaseIndexId(); - List indexIds = new ArrayList(); - indexIds.add(baseIndexId); - indexIds.addAll(olapTable.getIndexIdListExceptBaseIndex()); - for (int i = 0; i < indexIds.size(); i++) { - List indexSchema = indexSchemaMap.get(indexIds.get(i)); - MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(indexIds.get(i)); - currentIndexMeta.setSchema(indexSchema); - - int currentSchemaVersion = currentIndexMeta.getSchemaVersion(); - int newSchemaVersion = currentSchemaVersion + 1; - currentIndexMeta.setSchemaVersion(newSchemaVersion); - // generate schema hash for new index has to generate a new schema hash not equal to current schema hash - int currentSchemaHash = currentIndexMeta.getSchemaHash(); - int newSchemaHash = Util.generateSchemaHash(); - while (currentSchemaHash == newSchemaHash) { - newSchemaHash = Util.generateSchemaHash(); - } - currentIndexMeta.setSchemaHash(newSchemaHash); - } - olapTable.setIndexes(indexes); - olapTable.rebuildFullSchema(); - - //update max column unique id - int maxColUniqueId = olapTable.getMaxColUniqueId(); - for (Column column : indexSchemaMap.get(olapTable.getBaseIndexId())) { - if (column.getUniqueId() > maxColUniqueId) { - maxColUniqueId = column.getUniqueId(); - } - } - olapTable.setMaxColUniqueId(maxColUniqueId); - - if (!isReplay) { - TableAddOrDropColumnsInfo info = new TableAddOrDropColumnsInfo(db.getId(), olapTable.getId(), - indexSchemaMap, indexes, jobId); - LOG.debug("logModifyTableAddOrDropColumns info:{}", info); - editLog.logModifyTableAddOrDropColumns(info); - } - - //for compatibility, we need create a finished state schema change job v2 - - SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(jobId, db.getId(), olapTable.getId(), - olapTable.getName(), 1000); - schemaChangeJob.setJobState(AlterJobV2.JobState.FINISHED); - schemaChangeJob.setFinishedTimeMs(System.currentTimeMillis()); - this.getSchemaChangeHandler().addAlterJobV2(schemaChangeJob); - - LOG.info("finished modify table's add or drop columns. table: {}, is replay: {}", olapTable.getName(), - isReplay); - } - - public void replayModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info) throws MetaNotFoundException { - LOG.debug("info:{}", info); - long dbId = info.getDbId(); - long tableId = info.getTableId(); - Map> indexSchemaMap = info.getIndexSchemaMap(); - List indexes = info.getIndexes(); - long jobId = info.getJobId(); - - Database db = getInternalDataSource().getDbOrMetaException(dbId); - OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP); - olapTable.writeLock(); - try { - modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, indexes, jobId, true); - } catch (DdlException e) { - // should not happen - LOG.warn("failed to replay modify table add or drop columns", e); - } finally { - olapTable.writeUnlock(); - } - } - public TableName getTableNameByTableId(Long tableId) { for (String dbName : getInternalDataSource().getDbNames()) { DatabaseIf db = getInternalDataSource().getDbNullable(dbName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index e7e23abe430c55..e4bc3dcb1cd7e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -847,7 +847,7 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { } case OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS: { final TableAddOrDropColumnsInfo info = (TableAddOrDropColumnsInfo) journal.getData(); - catalog.replayModifyTableAddOrDropColumns(info); + catalog.getSchemaChangeHandler().replayModifyTableAddOrDropColumns(info); break; } default: { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 2b45c1c0bd3eab..047714d0f070dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -69,6 +69,9 @@ public class OperationType { public static final short OP_MODIFY_COMMENT = 126; public static final short OP_MODIFY_TABLE_ENGINE = 127; + //schema change for add and drop columns + public static final short OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS = 128; + // 30~39 130~139 230~239 ... // load job for only hadoop load public static final short OP_LOAD_START = 30; @@ -222,8 +225,6 @@ public class OperationType { // policy 310-320 public static final short OP_CREATE_POLICY = 310; public static final short OP_DROP_POLICY = 311; - //schema change for add and drop columns 320-329 - public static final short OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS = 320; // datasource 312-315 public static final short OP_CREATE_DS = 312;