Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Pair;
import org.apache.doris.common.SchemaVersionAndHash;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
Expand Down Expand Up @@ -85,6 +85,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
@SerializedName(value = "partitionIndexTabletMap")
private Table<Long, Long, Map<Long, Long>> partitionIndexTabletMap = HashBasedTable.create();
// partition id -> (shadow index id -> shadow index))
@SerializedName(value = "partitionIndexMap")
private Table<Long, Long, MaterializedIndex> partitionIndexMap = HashBasedTable.create();
// shadow index id -> origin index id
@SerializedName(value = "indexIdMap")
Expand All @@ -97,7 +98,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
private Map<Long, List<Column>> indexSchemaMap = Maps.newHashMap();
// shadow index id -> (shadow index schema version : schema hash)
@SerializedName(value = "indexSchemaVersionAndHashMap")
private Map<Long, Pair<Integer, Integer>> indexSchemaVersionAndHashMap = Maps.newHashMap();
private Map<Long, SchemaVersionAndHash> indexSchemaVersionAndHashMap = Maps.newHashMap();
// shadow index id -> shadow index short key count
@SerializedName(value = "indexShortKeyMap")
private Map<Long, Short> indexShortKeyMap = Maps.newHashMap();
Expand Down Expand Up @@ -151,7 +152,7 @@ public void addIndexSchema(long shadowIdxId, long originIdxId,
short shadowIdxShortKeyCount, List<Column> shadowIdxSchema) {
indexIdMap.put(shadowIdxId, originIdxId);
indexIdToName.put(shadowIdxId, shadowIndexName);
indexSchemaVersionAndHashMap.put(shadowIdxId, Pair.create(shadowSchemaVersion, shadowSchemaHash));
indexSchemaVersionAndHashMap.put(shadowIdxId, new SchemaVersionAndHash(shadowSchemaVersion, shadowSchemaHash));
indexShortKeyMap.put(shadowIdxId, shadowIdxShortKeyCount);
indexSchemaMap.put(shadowIdxId, shadowIdxSchema);
}
Expand Down Expand Up @@ -234,7 +235,7 @@ protected void runPendingJob() throws AlterCancelException {

short shadowShortKeyColumnCount = indexShortKeyMap.get(shadowIdxId);
List<Column> shadowSchema = indexSchemaMap.get(shadowIdxId);
int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).second;
int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
int originSchemaHash = tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId));

for (Tablet shadowTablet : shadowIdx.getTablets()) {
Expand Down Expand Up @@ -334,8 +335,8 @@ private void addShadowIndexToCatalog(OlapTable tbl) {

for (long shadowIdxId : indexIdMap.keySet()) {
tbl.setIndexMeta(shadowIdxId, indexIdToName.get(shadowIdxId), indexSchemaMap.get(shadowIdxId),
indexSchemaVersionAndHashMap.get(shadowIdxId).first,
indexSchemaVersionAndHashMap.get(shadowIdxId).second,
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaVersion,
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash,
indexShortKeyMap.get(shadowIdxId), TStorageType.COLUMN, null);
}

Expand Down Expand Up @@ -385,7 +386,7 @@ protected void runWaitingTxnJob() throws AlterCancelException {
MaterializedIndex shadowIdx = entry.getValue();

long originIdxId = indexIdMap.get(shadowIdxId);
int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).second;
int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
int originSchemaHash = tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId));

for (Tablet shadowTablet : shadowIdx.getTablets()) {
Expand Down Expand Up @@ -679,7 +680,7 @@ private void replayPending(SchemaChangeJobV2 replayedJob) {

TStorageMedium medium = tbl.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId,
indexSchemaVersionAndHashMap.get(shadowIndexId).second, medium);
indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium);

for (Tablet shadownTablet : shadowIndex.getTablets()) {
invertedIndex.addTablet(shadownTablet.getId(), shadowTabletMeta);
Expand Down Expand Up @@ -867,7 +868,7 @@ private void readJobNotFinishData(DataInput in) throws IOException {
}
int schemaVersion = in.readInt();
int schemaVersionHash = in.readInt();
Pair<Integer, Integer> schemaVersionAndHash = Pair.create(schemaVersion, schemaVersionHash);
SchemaVersionAndHash schemaVersionAndHash = new SchemaVersionAndHash(schemaVersion, schemaVersionHash);
short shortKeyCount = in.readShort();

indexIdMap.put(shadowIndexId, originIndexId);
Expand Down Expand Up @@ -923,7 +924,7 @@ private void readJobFinishedData(DataInput in) throws IOException {
String indexName = Text.readString(in);
int schemaVersion = in.readInt();
int schemaVersionHash = in.readInt();
Pair<Integer, Integer> schemaVersionAndHash = Pair.create(schemaVersion, schemaVersionHash);
SchemaVersionAndHash schemaVersionAndHash = new SchemaVersionAndHash(schemaVersion, schemaVersionHash);

indexIdMap.put(shadowIndexId, originIndexId);
indexIdToName.put(shadowIndexId, indexName);
Expand Down
61 changes: 61 additions & 0 deletions fe/src/main/java/org/apache/doris/common/SchemaVersionAndHash.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common;

import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/*
* Currently just used for persisting schema version and schema hash pair
* using GSON
*/
public class SchemaVersionAndHash implements Writable {

@SerializedName(value = "version")
public int schemaVersion;
@SerializedName(value = "hash")
public int schemaHash;

public SchemaVersionAndHash(int schemaVersion, int schemaHash) {
this.schemaVersion = schemaVersion;
this.schemaHash = schemaHash;
}

@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}

@Override
public String toString() {
return schemaVersion + ":" + schemaHash;
}

public static SchemaVersionAndHash read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, SchemaVersionAndHash.class);
}
}
3 changes: 2 additions & 1 deletion fe/src/main/java/org/apache/doris/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.FunctionSearchDesc;
import org.apache.doris.catalog.Resource;
import org.apache.doris.cluster.BaseParam;
import org.apache.doris.cluster.Cluster;
import org.apache.doris.common.Config;
Expand Down Expand Up @@ -779,6 +779,7 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) {
}
} catch (Exception e) {
LOG.error("Operation Type {}", opCode, e);
System.exit(-1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.SchemaVersionAndHash;
import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.meta.MetaContext;
Expand All @@ -62,6 +63,8 @@
import org.apache.doris.transaction.FakeTransactionIDGenerator;
import org.apache.doris.transaction.GlobalTransactionMgr;

import com.google.common.collect.Maps;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -393,7 +396,9 @@ public void testSerializeOfSchemaChangeJob() throws IOException {
SchemaChangeJobV2 schemaChangeJobV2 = new SchemaChangeJobV2(1, 1,1, "test",600000);
schemaChangeJobV2.setStorageFormat(TStorageFormat.V2);
Deencapsulation.setField(schemaChangeJobV2, "jobState", AlterJobV2.JobState.FINISHED);

Map<Long, SchemaVersionAndHash> indexSchemaVersionAndHashMap = Maps.newHashMap();
indexSchemaVersionAndHashMap.put(Long.valueOf(1000), new SchemaVersionAndHash(10, 20));
Deencapsulation.setField(schemaChangeJobV2, "indexSchemaVersionAndHashMap", indexSchemaVersionAndHashMap);

// write schema change job
schemaChangeJobV2.write(out);
Expand All @@ -410,5 +415,12 @@ public void testSerializeOfSchemaChangeJob() throws IOException {
Assert.assertEquals(1, result.getJobId());
Assert.assertEquals(AlterJobV2.JobState.FINISHED, result.getJobState());
Assert.assertEquals(TStorageFormat.V2, Deencapsulation.getField(result, "storageFormat"));

Assert.assertNotNull(Deencapsulation.getField(result, "partitionIndexMap"));
Assert.assertNotNull(Deencapsulation.getField(result, "partitionIndexTabletMap"));

Map<Long, SchemaVersionAndHash> map = Deencapsulation.getField(result, "indexSchemaVersionAndHashMap");
Assert.assertEquals(10, map.get(1000L).schemaVersion);
Assert.assertEquals(20, map.get(1000L).schemaHash);
}
}