diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 08838eb3dc116d..bdede94e30a53a 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -35,7 +35,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MarkedCountDownLatch; -import org.apache.doris.common.Pair; +import org.apache.doris.common.SchemaVersionAndHash; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.gson.GsonUtils; @@ -85,6 +85,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { @SerializedName(value = "partitionIndexTabletMap") private Table> partitionIndexTabletMap = HashBasedTable.create(); // partition id -> (shadow index id -> shadow index)) + @SerializedName(value = "partitionIndexMap") private Table partitionIndexMap = HashBasedTable.create(); // shadow index id -> origin index id @SerializedName(value = "indexIdMap") @@ -97,7 +98,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { private Map> indexSchemaMap = Maps.newHashMap(); // shadow index id -> (shadow index schema version : schema hash) @SerializedName(value = "indexSchemaVersionAndHashMap") - private Map> indexSchemaVersionAndHashMap = Maps.newHashMap(); + private Map indexSchemaVersionAndHashMap = Maps.newHashMap(); // shadow index id -> shadow index short key count @SerializedName(value = "indexShortKeyMap") private Map indexShortKeyMap = Maps.newHashMap(); @@ -151,7 +152,7 @@ public void addIndexSchema(long shadowIdxId, long originIdxId, short shadowIdxShortKeyCount, List shadowIdxSchema) { indexIdMap.put(shadowIdxId, originIdxId); indexIdToName.put(shadowIdxId, shadowIndexName); - indexSchemaVersionAndHashMap.put(shadowIdxId, Pair.create(shadowSchemaVersion, shadowSchemaHash)); + indexSchemaVersionAndHashMap.put(shadowIdxId, new SchemaVersionAndHash(shadowSchemaVersion, shadowSchemaHash)); indexShortKeyMap.put(shadowIdxId, shadowIdxShortKeyCount); indexSchemaMap.put(shadowIdxId, shadowIdxSchema); } @@ -234,7 +235,7 @@ protected void runPendingJob() throws AlterCancelException { short shadowShortKeyColumnCount = indexShortKeyMap.get(shadowIdxId); List shadowSchema = indexSchemaMap.get(shadowIdxId); - int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).second; + int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash; int originSchemaHash = tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId)); for (Tablet shadowTablet : shadowIdx.getTablets()) { @@ -334,8 +335,8 @@ private void addShadowIndexToCatalog(OlapTable tbl) { for (long shadowIdxId : indexIdMap.keySet()) { tbl.setIndexMeta(shadowIdxId, indexIdToName.get(shadowIdxId), indexSchemaMap.get(shadowIdxId), - indexSchemaVersionAndHashMap.get(shadowIdxId).first, - indexSchemaVersionAndHashMap.get(shadowIdxId).second, + indexSchemaVersionAndHashMap.get(shadowIdxId).schemaVersion, + indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash, indexShortKeyMap.get(shadowIdxId), TStorageType.COLUMN, null); } @@ -385,7 +386,7 @@ protected void runWaitingTxnJob() throws AlterCancelException { MaterializedIndex shadowIdx = entry.getValue(); long originIdxId = indexIdMap.get(shadowIdxId); - int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).second; + int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash; int originSchemaHash = tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId)); for (Tablet shadowTablet : shadowIdx.getTablets()) { @@ -679,7 +680,7 @@ private void replayPending(SchemaChangeJobV2 replayedJob) { TStorageMedium medium = tbl.getPartitionInfo().getDataProperty(partitionId).getStorageMedium(); TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId, - indexSchemaVersionAndHashMap.get(shadowIndexId).second, medium); + indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium); for (Tablet shadownTablet : shadowIndex.getTablets()) { invertedIndex.addTablet(shadownTablet.getId(), shadowTabletMeta); @@ -867,7 +868,7 @@ private void readJobNotFinishData(DataInput in) throws IOException { } int schemaVersion = in.readInt(); int schemaVersionHash = in.readInt(); - Pair schemaVersionAndHash = Pair.create(schemaVersion, schemaVersionHash); + SchemaVersionAndHash schemaVersionAndHash = new SchemaVersionAndHash(schemaVersion, schemaVersionHash); short shortKeyCount = in.readShort(); indexIdMap.put(shadowIndexId, originIndexId); @@ -923,7 +924,7 @@ private void readJobFinishedData(DataInput in) throws IOException { String indexName = Text.readString(in); int schemaVersion = in.readInt(); int schemaVersionHash = in.readInt(); - Pair schemaVersionAndHash = Pair.create(schemaVersion, schemaVersionHash); + SchemaVersionAndHash schemaVersionAndHash = new SchemaVersionAndHash(schemaVersion, schemaVersionHash); indexIdMap.put(shadowIndexId, originIndexId); indexIdToName.put(shadowIndexId, indexName); diff --git a/fe/src/main/java/org/apache/doris/common/SchemaVersionAndHash.java b/fe/src/main/java/org/apache/doris/common/SchemaVersionAndHash.java new file mode 100644 index 00000000000000..7932d52c923dd3 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/common/SchemaVersionAndHash.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/* + * Currently just used for persisting schema version and schema hash pair + * using GSON + */ +public class SchemaVersionAndHash implements Writable { + + @SerializedName(value = "version") + public int schemaVersion; + @SerializedName(value = "hash") + public int schemaHash; + + public SchemaVersionAndHash(int schemaVersion, int schemaHash) { + this.schemaVersion = schemaVersion; + this.schemaHash = schemaHash; + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + @Override + public String toString() { + return schemaVersion + ":" + schemaHash; + } + + public static SchemaVersionAndHash read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, SchemaVersionAndHash.class); + } +} diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java index 376209509dcbc3..fa3ef9e425e02a 100644 --- a/fe/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java @@ -29,9 +29,9 @@ import org.apache.doris.catalog.BrokerMgr; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.Function; import org.apache.doris.catalog.FunctionSearchDesc; +import org.apache.doris.catalog.Resource; import org.apache.doris.cluster.BaseParam; import org.apache.doris.cluster.Cluster; import org.apache.doris.common.Config; @@ -779,6 +779,7 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { } } catch (Exception e) { LOG.error("Operation Type {}", opCode, e); + System.exit(-1); } } diff --git a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java index edcf8eea298c57..1eea8bb5b0198b 100644 --- a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java +++ b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java @@ -52,6 +52,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.SchemaVersionAndHash; import org.apache.doris.common.UserException; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.meta.MetaContext; @@ -62,6 +63,8 @@ import org.apache.doris.transaction.FakeTransactionIDGenerator; import org.apache.doris.transaction.GlobalTransactionMgr; +import com.google.common.collect.Maps; + import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -393,7 +396,9 @@ public void testSerializeOfSchemaChangeJob() throws IOException { SchemaChangeJobV2 schemaChangeJobV2 = new SchemaChangeJobV2(1, 1,1, "test",600000); schemaChangeJobV2.setStorageFormat(TStorageFormat.V2); Deencapsulation.setField(schemaChangeJobV2, "jobState", AlterJobV2.JobState.FINISHED); - + Map indexSchemaVersionAndHashMap = Maps.newHashMap(); + indexSchemaVersionAndHashMap.put(Long.valueOf(1000), new SchemaVersionAndHash(10, 20)); + Deencapsulation.setField(schemaChangeJobV2, "indexSchemaVersionAndHashMap", indexSchemaVersionAndHashMap); // write schema change job schemaChangeJobV2.write(out); @@ -410,5 +415,12 @@ public void testSerializeOfSchemaChangeJob() throws IOException { Assert.assertEquals(1, result.getJobId()); Assert.assertEquals(AlterJobV2.JobState.FINISHED, result.getJobState()); Assert.assertEquals(TStorageFormat.V2, Deencapsulation.getField(result, "storageFormat")); + + Assert.assertNotNull(Deencapsulation.getField(result, "partitionIndexMap")); + Assert.assertNotNull(Deencapsulation.getField(result, "partitionIndexTabletMap")); + + Map map = Deencapsulation.getField(result, "indexSchemaVersionAndHashMap"); + Assert.assertEquals(10, map.get(1000L).schemaVersion); + Assert.assertEquals(20, map.get(1000L).schemaHash); } }