Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](schema change) refact fe light schema change. #5

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
Expand All @@ -70,6 +71,7 @@
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.RemoveAlterJobV2OperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
Expand Down Expand Up @@ -1727,8 +1729,7 @@ public int getAsInt() {
if (ligthSchemaChange) {
long jobId = Catalog.getCurrentCatalog().getNextId();
//for schema change add/drop value column optimize, direct modify table meta.
Catalog.getCurrentCatalog()
.modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, jobId, false);
modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, newIndexes, jobId, false);
return;
} else {
createJob(db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes);
Expand Down Expand Up @@ -2084,4 +2085,179 @@ public void replayAlterJobV2(AlterJobV2 alterJob) {
}
super.replayAlterJobV2(alterJob);
}

// the invoker should keep table's write lock
public void modifyTableAddOrDropColumns(Database db, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap,
List<Index> indexes, long jobId, boolean isReplay) throws DdlException {

LOG.debug("indexSchemaMap:{}, indexes:{}", indexSchemaMap, indexes);
if (olapTable.getState() == OlapTableState.ROLLUP) {
throw new DdlException("Table[" + olapTable.getName() + "]'s is doing ROLLUP job");
}

// for now table's state can only be NORMAL
Preconditions.checkState(olapTable.getState() == OlapTableState.NORMAL, olapTable.getState().name());

// for bitmapIndex
boolean hasIndexChange = false;
Set<Index> newSet = new HashSet<>(indexes);
Set<Index> oriSet = new HashSet<>(olapTable.getIndexes());
if (!newSet.equals(oriSet)) {
hasIndexChange = true;
}

// begin checking each table
// ATTN: DO NOT change any meta in this loop
Map<Long, List<Column>> changedIndexIdToSchema = Maps.newHashMap();
for (Long alterIndexId : indexSchemaMap.keySet()) {
// Must get all columns including invisible columns.
// Because in alter process, all columns must be considered.
List<Column> alterSchema = indexSchemaMap.get(alterIndexId);

LOG.debug("index[{}] is changed. start checking...", alterIndexId);
// 1. check order: a) has key; b) value after key
boolean meetValue = false;
boolean hasKey = false;
for (Column column : alterSchema) {
if (column.isKey() && meetValue) {
throw new DdlException(
"Invalid column order. value should be after key. index[" + olapTable.getIndexNameById(
alterIndexId) + "]");
}
if (!column.isKey()) {
meetValue = true;
} else {
hasKey = true;
}
}
if (!hasKey) {
throw new DdlException("No key column left. index[" + olapTable.getIndexNameById(alterIndexId) + "]");
}

// 2. check partition key
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
List<Column> partitionColumns = partitionInfo.getPartitionColumns();
for (Column partitionCol : partitionColumns) {
boolean found = false;
for (Column alterColumn : alterSchema) {
if (alterColumn.nameEquals(partitionCol.getName(), true)) {
found = true;
break;
}
} // end for alterColumns

if (!found && alterIndexId == olapTable.getBaseIndexId()) {
// 2.1 partition column cannot be deleted.
throw new DdlException(
"Partition column[" + partitionCol.getName() + "] cannot be dropped. index["
+ olapTable.getIndexNameById(alterIndexId) + "]");
}
} // end for partitionColumns
}

// 3. check distribution key:
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
if (distributionInfo.getType() == DistributionInfoType.HASH) {
List<Column> distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
for (Column distributionCol : distributionColumns) {
boolean found = false;
for (Column alterColumn : alterSchema) {
if (alterColumn.nameEquals(distributionCol.getName(), true)) {
found = true;
break;
}
} // end for alterColumns
if (!found && alterIndexId == olapTable.getBaseIndexId()) {
// 2.2 distribution column cannot be deleted.
throw new DdlException(
"Distribution column[" + distributionCol.getName() + "] cannot be dropped. index["
+ olapTable.getIndexNameById(alterIndexId) + "]");
}
} // end for distributionCols
}

// 5. store the changed columns for edit log
changedIndexIdToSchema.put(alterIndexId, alterSchema);

LOG.debug("schema change[{}-{}-{}] check pass.", db.getId(), olapTable.getId(), alterIndexId);
} // end for indices

if (changedIndexIdToSchema.isEmpty() && !hasIndexChange) {
throw new DdlException("Nothing is changed. please check your alter stmt.");
}

//update base index schema
long baseIndexId = olapTable.getBaseIndexId();
List<Long> indexIds = new ArrayList<Long>();
indexIds.add(baseIndexId);
indexIds.addAll(olapTable.getIndexIdListExceptBaseIndex());
for (int i = 0; i < indexIds.size(); i++) {
List<Column> indexSchema = indexSchemaMap.get(indexIds.get(i));
MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(indexIds.get(i));
currentIndexMeta.setSchema(indexSchema);

int currentSchemaVersion = currentIndexMeta.getSchemaVersion();
int newSchemaVersion = currentSchemaVersion + 1;
currentIndexMeta.setSchemaVersion(newSchemaVersion);
// generate schema hash for new index has to generate a new schema hash not equal to current schema hash
int currentSchemaHash = currentIndexMeta.getSchemaHash();
int newSchemaHash = Util.generateSchemaHash();
while (currentSchemaHash == newSchemaHash) {
newSchemaHash = Util.generateSchemaHash();
}
currentIndexMeta.setSchemaHash(newSchemaHash);
}
olapTable.setIndexes(indexes);
olapTable.rebuildFullSchema();

//update max column unique id
int maxColUniqueId = olapTable.getMaxColUniqueId();
for (Column column : indexSchemaMap.get(olapTable.getBaseIndexId())) {
if (column.getUniqueId() > maxColUniqueId) {
maxColUniqueId = column.getUniqueId();
}
}
olapTable.setMaxColUniqueId(maxColUniqueId);

if (!isReplay) {
TableAddOrDropColumnsInfo info = new TableAddOrDropColumnsInfo(db.getId(), olapTable.getId(),
indexSchemaMap, indexes, jobId);
LOG.debug("logModifyTableAddOrDropColumns info:{}", info);
Catalog.getCurrentCatalog().getEditLog().logModifyTableAddOrDropColumns(info);
}

//for compatibility, we need create a finished state schema change job v2

SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(jobId, db.getId(), olapTable.getId(),
olapTable.getName(), 1000);
schemaChangeJob.setJobState(AlterJobV2.JobState.FINISHED);
schemaChangeJob.setFinishedTimeMs(System.currentTimeMillis());
this.addAlterJobV2(schemaChangeJob);

LOG.info("finished modify table's add or drop columns. table: {}, is replay: {}", olapTable.getName(),
isReplay);
}

public void replayModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info) throws MetaNotFoundException {
LOG.debug("info:{}", info);
long dbId = info.getDbId();
long tableId = info.getTableId();
Map<Long, LinkedList<Column>> indexSchemaMap = info.getIndexSchemaMap();
List<Index> indexes = info.getIndexes();
long jobId = info.getJobId();

Database db = Catalog.getCurrentCatalog().getInternalDataSource().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
olapTable.writeLock();
try {
modifyTableAddOrDropColumns(db, olapTable, indexSchemaMap, indexes, jobId, true);
} catch (DdlException e) {
// should not happen
LOG.warn("failed to replay modify table add or drop columns", e);
} finally {
olapTable.writeUnlock();
}
}
}
Loading