diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java index b94333e41b985b..8872c9b59658c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java @@ -22,6 +22,7 @@ import org.apache.doris.common.DdlException; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -32,6 +33,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; /** @@ -44,7 +46,7 @@ public class AddPartitionEvent extends MetastorePartitionEvent { // for test public AddPartitionEvent(long eventId, String catalogName, String dbName, String tblName, List partitionNames) { - super(eventId, catalogName, dbName, tblName); + super(eventId, catalogName, dbName, tblName, MetastoreEventType.ADD_PARTITION); this.partitionNames = partitionNames; this.hmsTbl = null; } @@ -71,6 +73,20 @@ private AddPartitionEvent(NotificationEvent event, } } + @Override + protected boolean willChangePartitionName() { + return false; + } + + @Override + public Set getAllPartitionNames() { + return ImmutableSet.copyOf(partitionNames); + } + + public void removePartition(String partitionName) { + partitionNames.remove(partitionName); + } + protected static List getEvents(NotificationEvent event, String catalogName) { return Lists.newArrayList(new AddPartitionEvent(event, catalogName)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java index d56eb52fad56e6..fd31701f712cb8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage; import java.util.List; +import java.util.Random; /** * MetastoreEvent for ALTER_DATABASE event type @@ -41,13 +42,15 @@ public class AlterDatabaseEvent extends MetastoreEvent { // true if this alter event was due to a rename operation private final boolean isRename; + private final String dbNameAfter; // for test public AlterDatabaseEvent(long eventId, String catalogName, String dbName, boolean isRename) { - super(eventId, catalogName, dbName, null); + super(eventId, catalogName, dbName, null, MetastoreEventType.ALTER_DATABASE); this.isRename = isRename; this.dbBefore = null; this.dbAfter = null; + this.dbNameAfter = isRename ? (dbName + new Random().nextInt(10)) : dbName; } private AlterDatabaseEvent(NotificationEvent event, @@ -61,6 +64,7 @@ private AlterDatabaseEvent(NotificationEvent event, .getAlterDatabaseMessage(event.getMessage()); dbBefore = Preconditions.checkNotNull(alterDatabaseMessage.getDbObjBefore()); dbAfter = Preconditions.checkNotNull(alterDatabaseMessage.getDbObjAfter()); + dbNameAfter = dbAfter.getName(); } catch (Exception e) { throw new MetastoreNotificationException( debugString("Unable to parse the alter database message"), e); @@ -97,6 +101,10 @@ public boolean isRename() { return isRename; } + public String getDbNameAfter() { + return dbNameAfter; + } + @Override protected void process() throws MetastoreNotificationException { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java index 0fc6be375d80ce..eb278fe479f67e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java @@ -22,6 +22,7 @@ import org.apache.doris.common.DdlException; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -30,7 +31,8 @@ import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; import java.util.List; -import java.util.Objects; +import java.util.Random; +import java.util.Set; import java.util.stream.Collectors; /** @@ -47,14 +49,14 @@ public class AlterPartitionEvent extends MetastorePartitionEvent { // for test public AlterPartitionEvent(long eventId, String catalogName, String dbName, String tblName, - String partitionNameBefore, String partitionNameAfter) { - super(eventId, catalogName, dbName, tblName); + String partitionNameBefore, boolean isRename) { + super(eventId, catalogName, dbName, tblName, MetastoreEventType.ALTER_PARTITION); this.partitionNameBefore = partitionNameBefore; - this.partitionNameAfter = partitionNameAfter; + this.partitionNameAfter = isRename ? (partitionNameBefore + new Random().nextInt(100)) : partitionNameBefore; this.hmsTbl = null; this.partitionAfter = null; this.partitionBefore = null; - isRename = !partitionNameBefore.equalsIgnoreCase(partitionNameAfter); + this.isRename = isRename; } private AlterPartitionEvent(NotificationEvent event, @@ -80,6 +82,24 @@ private AlterPartitionEvent(NotificationEvent event, } } + @Override + protected boolean willChangePartitionName() { + return isRename; + } + + @Override + public Set getAllPartitionNames() { + return ImmutableSet.of(partitionNameBefore); + } + + public String getPartitionNameAfter() { + return partitionNameAfter; + } + + public boolean isRename() { + return isRename; + } + protected static List getEvents(NotificationEvent event, String catalogName) { return Lists.newArrayList(new AlterPartitionEvent(event, catalogName)); @@ -109,10 +129,28 @@ protected void process() throws MetastoreNotificationException { } @Override - protected boolean canBeBatched(MetastoreEvent event) { - return isSameTable(event) - && event instanceof AlterPartitionEvent - && Objects.equals(partitionBefore, ((AlterPartitionEvent) event).partitionBefore) - && Objects.equals(partitionAfter, ((AlterPartitionEvent) event).partitionAfter); + protected boolean canBeBatched(MetastoreEvent that) { + if (!isSameTable(that) || !(that instanceof MetastorePartitionEvent)) { + return false; + } + + // Check if `that` event is a rename event, a rename event can not be batched + // because the process of `that` event will change the reference relation of this partition + MetastorePartitionEvent thatPartitionEvent = (MetastorePartitionEvent) that; + if (thatPartitionEvent.willChangePartitionName()) { + return false; + } + + // `that` event can be batched if this event's partitions contains all of the partitions which `that` event has + // else just remove `that` event's relevant partitions + for (String partitionName : getAllPartitionNames()) { + if (thatPartitionEvent instanceof AddPartitionEvent) { + ((AddPartitionEvent) thatPartitionEvent).removePartition(partitionName); + } else if (thatPartitionEvent instanceof DropPartitionEvent) { + ((DropPartitionEvent) thatPartitionEvent).removePartition(partitionName); + } + } + + return getAllPartitionNames().containsAll(thatPartitionEvent.getAllPartitionNames()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java index bc09d6ef2c38ad..8c79b6d2487206 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage; import java.util.List; +import java.util.Random; /** * MetastoreEvent for ALTER_TABLE event type @@ -41,17 +42,17 @@ public class AlterTableEvent extends MetastoreTableEvent { // true if this alter event was due to a rename operation private final boolean isRename; private final boolean isView; - private final boolean willCreateOrDropTable; + private final String tblNameAfter; // for test public AlterTableEvent(long eventId, String catalogName, String dbName, String tblName, boolean isRename, boolean isView) { - super(eventId, catalogName, dbName, tblName); + super(eventId, catalogName, dbName, tblName, MetastoreEventType.ALTER_TABLE); this.isRename = isRename; this.isView = isView; this.tableBefore = null; this.tableAfter = null; - this.willCreateOrDropTable = isRename || isView; + this.tblNameAfter = isRename ? (tblName + new Random().nextInt(10)) : tblName; } private AlterTableEvent(NotificationEvent event, String catalogName) { @@ -65,6 +66,7 @@ private AlterTableEvent(NotificationEvent event, String catalogName) { .getAlterTableMessage(event.getMessage()); tableAfter = Preconditions.checkNotNull(alterTableMessage.getTableObjAfter()); tableBefore = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore()); + tblNameAfter = tableAfter.getTableName(); } catch (Exception e) { throw new MetastoreNotificationException( debugString("Unable to parse the alter table message"), e); @@ -73,7 +75,6 @@ private AlterTableEvent(NotificationEvent event, String catalogName) { isRename = !tableBefore.getDbName().equalsIgnoreCase(tableAfter.getDbName()) || !tableBefore.getTableName().equalsIgnoreCase(tableAfter.getTableName()); isView = tableBefore.isSetViewExpandedText() || tableBefore.isSetViewOriginalText(); - this.willCreateOrDropTable = isRename || isView; } public static List getEvents(NotificationEvent event, @@ -83,7 +84,12 @@ public static List getEvents(NotificationEvent event, @Override protected boolean willCreateOrDropTable() { - return willCreateOrDropTable; + return isRename || isView; + } + + @Override + protected boolean willChangeTableName() { + return isRename; } private void processRecreateTable() throws DdlException { @@ -123,6 +129,10 @@ public boolean isView() { return isView; } + public String getTblNameAfter() { + return tblNameAfter; + } + /** * If the ALTER_TABLE event is due a table rename, this method removes the old table * and creates a new table with the new name. Else, we just refresh table @@ -157,15 +167,22 @@ protected boolean canBeBatched(MetastoreEvent that) { return false; } - // `that` event must not be a rename table event - // so if the process of this event will drop this table, - // it can merge all the table's events before - if (willCreateOrDropTable) { + // First check if `that` event is a rename event, a rename event can not be batched + // because the process of `that` event will change the reference relation of this table + // `that` event must be a MetastoreTableEvent event otherwise `isSameTable` will return false + MetastoreTableEvent thatTblEvent = (MetastoreTableEvent) that; + if (thatTblEvent.willChangeTableName()) { + return false; + } + + // Then check if the process of this event will create or drop this table, + // if true then `that` event can be batched + if (willCreateOrDropTable()) { return true; } - // that event must be a MetastoreTableEvent event - // otherwise `isSameTable` will return false - return !((MetastoreTableEvent) that).willCreateOrDropTable(); + // Last, check if the process of `that` event will create or drop this table + // if false then `that` event can be batched + return !thatTblEvent.willCreateOrDropTable(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java index eb8da00cfea0fa..42d813319cc5f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java @@ -34,7 +34,7 @@ public class CreateDatabaseEvent extends MetastoreEvent { // for test public CreateDatabaseEvent(long eventId, String catalogName, String dbName) { - super(eventId, catalogName, dbName, null); + super(eventId, catalogName, dbName, null, MetastoreEventType.CREATE_DATABASE); } private CreateDatabaseEvent(NotificationEvent event, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java index 1ec8cbfde5eb40..3dff6420a8a665 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java @@ -37,7 +37,7 @@ public class CreateTableEvent extends MetastoreTableEvent { // for test public CreateTableEvent(long eventId, String catalogName, String dbName, String tblName) { - super(eventId, catalogName, dbName, tblName); + super(eventId, catalogName, dbName, tblName, MetastoreEventType.CREATE_TABLE); this.hmsTbl = null; } @@ -66,6 +66,11 @@ protected boolean willCreateOrDropTable() { return true; } + @Override + protected boolean willChangeTableName() { + return false; + } + @Override protected void process() throws MetastoreNotificationException { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java index 6f6364657b343c..3481f832fe0006 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java @@ -32,6 +32,11 @@ */ public class DropDatabaseEvent extends MetastoreEvent { + // for test + public DropDatabaseEvent(long eventId, String catalogName, String dbName) { + super(eventId, catalogName, dbName, null, MetastoreEventType.DROP_DATABASE); + } + private DropDatabaseEvent(NotificationEvent event, String catalogName) { super(event, catalogName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java index 7f8ade0819fce7..738f113f0edc81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java @@ -22,6 +22,7 @@ import org.apache.doris.common.DdlException; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -31,15 +32,24 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** - * MetastoreEvent for ADD_PARTITION event type + * MetastoreEvent for DROP_PARTITION event type */ public class DropPartitionEvent extends MetastorePartitionEvent { private final Table hmsTbl; private final List partitionNames; + // for test + public DropPartitionEvent(long eventId, String catalogName, String dbName, + String tblName, List partitionNames) { + super(eventId, catalogName, dbName, tblName, MetastoreEventType.DROP_PARTITION); + this.partitionNames = partitionNames; + this.hmsTbl = null; + } + private DropPartitionEvent(NotificationEvent event, String catalogName) { super(event, catalogName); @@ -62,6 +72,20 @@ private DropPartitionEvent(NotificationEvent event, } } + @Override + protected boolean willChangePartitionName() { + return false; + } + + @Override + public Set getAllPartitionNames() { + return ImmutableSet.copyOf(partitionNames); + } + + public void removePartition(String partitionName) { + partitionNames.remove(partitionName); + } + protected static List getEvents(NotificationEvent event, String catalogName) { return Lists.newArrayList( @@ -85,4 +109,30 @@ protected void process() throws MetastoreNotificationException { debugString("Failed to process event"), e); } } + + @Override + protected boolean canBeBatched(MetastoreEvent that) { + if (!isSameTable(that) || !(that instanceof MetastorePartitionEvent)) { + return false; + } + + MetastorePartitionEvent thatPartitionEvent = (MetastorePartitionEvent) that; + // Check if `that` event is a rename event, a rename event can not be batched + // because the process of `that` event will change the reference relation of this partition + if (thatPartitionEvent.willChangePartitionName()) { + return false; + } + + // `that` event can be batched if this event's partitions contains all of the partitions which `that` event has + // else just remove `that` event's relevant partitions + for (String partitionName : getAllPartitionNames()) { + if (thatPartitionEvent instanceof AddPartitionEvent) { + ((AddPartitionEvent) thatPartitionEvent).removePartition(partitionName); + } else if (thatPartitionEvent instanceof DropPartitionEvent) { + ((DropPartitionEvent) thatPartitionEvent).removePartition(partitionName); + } + } + + return getAllPartitionNames().containsAll(thatPartitionEvent.getAllPartitionNames()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java index 7b43a0966610d5..c333506cad2603 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java @@ -37,7 +37,7 @@ public class DropTableEvent extends MetastoreTableEvent { // for test public DropTableEvent(long eventId, String catalogName, String dbName, String tblName) { - super(eventId, catalogName, dbName, tblName); + super(eventId, catalogName, dbName, tblName, MetastoreEventType.DROP_TABLE); this.tableName = tblName; } @@ -67,6 +67,11 @@ protected boolean willCreateOrDropTable() { return true; } + @Override + protected boolean willChangeTableName() { + return false; + } + @Override protected void process() throws MetastoreNotificationException { try { @@ -80,8 +85,18 @@ protected void process() throws MetastoreNotificationException { @Override protected boolean canBeBatched(MetastoreEvent that) { - // `that` event must not be a rename table event - // so merge all events which belong to this table before is ok - return isSameTable(that); + if (!isSameTable(that)) { + return false; + } + + /** + * Check if `that` event is a rename event, a rename event can not be batched + * because the process of `that` event will change the reference relation of this table, + * otherwise it can be batched because this event is a drop-table event + * and the process of this event will drop the whole table, + * and `that` event must be a MetastoreTableEvent event otherwise `isSameTable` will return false + * */ + MetastoreTableEvent thatTblEvent = (MetastoreTableEvent) that; + return !thatTblEvent.willChangeTableName(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java index 3b5650ade4a778..7436d57c981c6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java @@ -38,7 +38,7 @@ public class InsertEvent extends MetastoreTableEvent { // for test public InsertEvent(long eventId, String catalogName, String dbName, String tblName) { - super(eventId, catalogName, dbName, tblName); + super(eventId, catalogName, dbName, tblName, MetastoreEventType.INSERT); this.hmsTbl = null; } @@ -66,11 +66,16 @@ protected boolean willCreateOrDropTable() { return false; } + @Override + protected boolean willChangeTableName() { + return false; + } + @Override protected void process() throws MetastoreNotificationException { try { infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, tblName); - /*** + /** * Only when we use hive client to execute a `INSERT INTO TBL SELECT * ...` or `INSERT INTO TBL ...` sql * to a non-partitioned table then the hms will generate an insert event, and there is not * any partition event occurs, but the file cache may has been changed, so we need handle this. @@ -91,8 +96,11 @@ protected boolean canBeBatched(MetastoreEvent that) { return false; } - // that event must be a MetastoreTableEvent event - // otherwise `isSameTable` will return false + /** + * Because the cache of this table will be cleared when handling `InsertEvent`, + * so `that` event can be batched if `that` event will not create or drop this table, + * and `that` event must be a MetastoreTableEvent event otherwise `isSameTable` will return false + */ return !((MetastoreTableEvent) that).willCreateOrDropTable(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java index 08aff93ddaa415..b4fc963d9802a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java @@ -58,12 +58,13 @@ public abstract class MetastoreEvent { protected final String catalogName; // for test - protected MetastoreEvent(long eventId, String catalogName, String dbName, String tblName) { + protected MetastoreEvent(long eventId, String catalogName, String dbName, + String tblName, MetastoreEventType eventType) { this.eventId = eventId; this.catalogName = catalogName; this.dbName = dbName; this.tblName = tblName; - this.eventType = null; + this.eventType = eventType; this.metastoreNotificationEvent = null; this.event = null; } @@ -97,7 +98,6 @@ public String getTblName() { /** * Checks if the given event can be batched into this event. Default behavior is * to return false which can be overridden by a sub-class. - * The current version is relatively simple to process batch events, so all that need to be processed are true. * * @param event The event under consideration to be batched into this event. * @return false if event cannot be batched into this event; otherwise true. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java index a5bf0d953ced6c..aabc562dba126c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java @@ -99,7 +99,7 @@ public List createBatchEvents(String catalogName, List createBatchEvents(String catalogName, List filteredEvents = eventsCopy.stream().filter(Objects::nonNull) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java index f8bb457ea3c248..8a66dd1d6f954b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java @@ -20,14 +20,17 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import java.util.Set; + /** * Base class for all the partition events */ public abstract class MetastorePartitionEvent extends MetastoreTableEvent { // for test - protected MetastorePartitionEvent(long eventId, String catalogName, String dbName, String tblName) { - super(eventId, catalogName, dbName, tblName); + protected MetastorePartitionEvent(long eventId, String catalogName, String dbName, + String tblName, MetastoreEventType eventType) { + super(eventId, catalogName, dbName, tblName, eventType); } protected MetastorePartitionEvent(NotificationEvent event, String catalogName) { @@ -37,4 +40,15 @@ protected MetastorePartitionEvent(NotificationEvent event, String catalogName) { protected boolean willCreateOrDropTable() { return false; } + + protected boolean willChangeTableName() { + return false; + } + + /** + * Returns if the process of this event will rename this partition. + */ + protected abstract boolean willChangePartitionName(); + + public abstract Set getAllPartitionNames(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java index c797c1c08dac4a..4e9713a2758b3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java @@ -31,8 +31,9 @@ public abstract class MetastoreTableEvent extends MetastoreEvent { // for test - protected MetastoreTableEvent(long eventId, String catalogName, String dbName, String tblName) { - super(eventId, catalogName, dbName, tblName); + protected MetastoreTableEvent(long eventId, String catalogName, String dbName, + String tblName, MetastoreEventType eventType) { + super(eventId, catalogName, dbName, tblName, eventType); } protected MetastoreTableEvent(NotificationEvent event, String catalogName) { @@ -67,6 +68,11 @@ protected boolean isSameTable(MetastoreEvent that) { */ protected abstract boolean willCreateOrDropTable(); + /** + * Returns if the process of this event will rename this table. + */ + protected abstract boolean willChangeTableName(); + public TableKey getTableKey() { return new TableKey(catalogName, dbName, tblName); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java index ba18c84bd7e0e1..c9e566dc9d8d6b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java @@ -23,171 +23,450 @@ import org.apache.doris.datasource.hive.event.AlterTableEvent; import org.apache.doris.datasource.hive.event.CreateDatabaseEvent; import org.apache.doris.datasource.hive.event.CreateTableEvent; +import org.apache.doris.datasource.hive.event.DropDatabaseEvent; +import org.apache.doris.datasource.hive.event.DropPartitionEvent; import org.apache.doris.datasource.hive.event.DropTableEvent; import org.apache.doris.datasource.hive.event.InsertEvent; import org.apache.doris.datasource.hive.event.MetastoreEvent; import org.apache.doris.datasource.hive.event.MetastoreEventFactory; -import org.apache.hadoop.util.Lists; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.function.Function; public class MetastoreEventFactoryTest { + private static final MetastoreEventFactory factory = new MetastoreEventFactory(); + private static final Random random = new Random(System.currentTimeMillis()); + private static final String testCtl = "test_ctl"; + + private static final Function createDatabaseEventProducer = eventId + -> new CreateDatabaseEvent(eventId, testCtl, randomDb()); + + private static final Function alterDatabaseEventProducer = eventId + -> new AlterDatabaseEvent(eventId, testCtl, randomDb(), randomBool(0.0001D)); + + private static final Function dropDatabaseEventProducer = eventId + -> new DropDatabaseEvent(eventId, testCtl, randomDb()); + + private static final Function createTableEventProducer = eventId + -> new CreateTableEvent(eventId, testCtl, randomDb(), randomTbl()); + + private static final Function alterTableEventProducer = eventId + -> new AlterTableEvent(eventId, testCtl, randomDb(), randomTbl(), + randomBool(0.1D), randomBool(0.1D)); + + private static final Function insertEventProducer = eventId + -> new InsertEvent(eventId, testCtl, randomDb(), randomTbl()); + + private static final Function dropTableEventProducer = eventId + -> new DropTableEvent(eventId, testCtl, randomDb(), randomTbl()); + + private static final Function addPartitionEventProducer = eventId + -> new AddPartitionEvent(eventId, testCtl, randomDb(), randomTbl(), randomPartitions()); + + private static final Function alterPartitionEventProducer = eventId + -> new AlterPartitionEvent(eventId, testCtl, randomDb(), randomTbl(), randomPartition(), + randomBool(0.1D)); + + private static final Function dropPartitionEventProducer = eventId + -> new DropPartitionEvent(eventId, testCtl, randomDb(), randomTbl(), randomPartitions()); + + private static final List> eventProducers = Arrays.asList( + createDatabaseEventProducer, alterDatabaseEventProducer, dropDatabaseEventProducer, + createTableEventProducer, alterTableEventProducer, insertEventProducer, dropTableEventProducer, + addPartitionEventProducer, alterPartitionEventProducer, dropPartitionEventProducer); + + private static String randomDb() { + return "db_" + random.nextInt(10); + } + + private static String randomTbl() { + return "tbl_" + random.nextInt(100); + } + + private static String randomPartition() { + return "partition_" + random.nextInt(1000); + } + + private static List randomPartitions() { + int times = random.nextInt(100) + 1; + Set partitions = Sets.newHashSet(); + for (int i = 0; i < times; i++) { + partitions.add(randomPartition()); + } + return Lists.newArrayList(partitions); + } + + private static boolean randomBool(double possibility) { + Preconditions.checkArgument(possibility >= 0.0D && possibility <= 1.0D); + int upperBound = (int) Math.floor(1000000 * possibility); + return random.nextInt(1000000) <= upperBound; + } + + // define MockCatalog/MockDatabase/MockTable/MockPartition to simulate the real catalog/database/table/partition + private static class MockCatalog { + private String ctlName; + private Map databases = Maps.newHashMap(); + + private MockCatalog(String ctlName) { + this.ctlName = ctlName; + } + + @Override + public int hashCode() { + return 31 * Objects.hash(ctlName) + Arrays.hashCode( + databases.values().stream().sorted(Comparator.comparing(d -> d.dbName)).toArray()); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof MockCatalog)) { + return false; + } + if (!Objects.equals(this.ctlName, ((MockCatalog) other).ctlName)) { + return false; + } + Object[] sortedDatabases = databases.values().stream() + .sorted(Comparator.comparing(d -> d.dbName)).toArray(); + Object[] otherSortedDatabases = ((MockCatalog) other).databases.values().stream() + .sorted(Comparator.comparing(d -> d.dbName)).toArray(); + return Arrays.equals(sortedDatabases, otherSortedDatabases); + } + + public MockCatalog copy() { + MockCatalog mockCatalog = new MockCatalog(this.ctlName); + mockCatalog.databases.putAll(this.databases); + return mockCatalog; + } + } + + private static class MockDatabase { + private String dbName; + private Map tables = Maps.newHashMap(); + + private MockDatabase(String dbName) { + this.dbName = dbName; + } + + @Override + public int hashCode() { + return 31 * Objects.hash(dbName) + Arrays.hashCode( + tables.values().stream().sorted(Comparator.comparing(t -> t.tblName)).toArray()); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof MockDatabase)) { + return false; + } + if (!Objects.equals(this.dbName, ((MockDatabase) other).dbName)) { + return false; + } + Object[] sortedTables = tables.values().stream() + .sorted(Comparator.comparing(t -> t.tblName)).toArray(); + Object[] otherSortedTables = ((MockDatabase) other).tables.values().stream() + .sorted(Comparator.comparing(t -> t.tblName)).toArray(); + return Arrays.equals(sortedTables, otherSortedTables); + } + + public MockDatabase copy() { + MockDatabase mockDatabase = new MockDatabase(this.dbName); + mockDatabase.tables.putAll(this.tables); + return mockDatabase; + } + } + + private static class MockTable { + private String tblName; + // use this filed to mark if the table has been refreshed + private boolean refreshed; + private Map partitions = Maps.newHashMap(); + + private MockTable(String tblName) { + this.tblName = tblName; + } + + public void refresh() { + this.refreshed = true; + } + + @Override + public int hashCode() { + return 31 * Objects.hash(tblName, refreshed) + Arrays.hashCode( + partitions.values().stream().sorted(Comparator.comparing(p -> p.partitionName)).toArray()); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof MockTable)) { + return false; + } + if (!Objects.equals(this.tblName, ((MockTable) other).tblName)) { + return false; + } + if (refreshed != ((MockTable) other).refreshed) { + return false; + } + Object[] sortedPartitions = partitions.values().stream() + .sorted(Comparator.comparing(p -> p.partitionName)).toArray(); + Object[] otherSortedPartitions = ((MockTable) other).partitions.values().stream() + .sorted(Comparator.comparing(p -> p.partitionName)).toArray(); + return Arrays.equals(sortedPartitions, otherSortedPartitions); + } + + public MockTable copy() { + MockTable copyTbl = new MockTable(this.tblName); + copyTbl.partitions.putAll(this.partitions); + return copyTbl; + } + } + + private static class MockPartition { + private String partitionName; + // use this filed to mark if the partition has been refreshed + private boolean refreshed; + + private MockPartition(String partitionName) { + this.partitionName = partitionName; + this.refreshed = false; + } + + public void refresh() { + this.refreshed = true; + } + + @Override + public int hashCode() { + return Objects.hash(refreshed, partitionName); + } + + @Override + public boolean equals(Object other) { + return other instanceof MockPartition + && refreshed == ((MockPartition) other).refreshed + && Objects.equals(this.partitionName, ((MockPartition) other).partitionName); + } + } + + // simulate the processes when handling hms events + private void processEvent(MockCatalog ctl, MetastoreEvent event) { + switch (event.getEventType()) { + + case CREATE_DATABASE: + MockDatabase database = new MockDatabase(event.getDbName()); + ctl.databases.put(database.dbName, database); + break; + + case DROP_DATABASE: + ctl.databases.remove(event.getDbName()); + break; + + case ALTER_DATABASE: + String dbName = event.getDbName(); + if (((AlterDatabaseEvent) event).isRename()) { + ctl.databases.remove(dbName); + MockDatabase newDatabase = new MockDatabase(((AlterDatabaseEvent) event).getDbNameAfter()); + ctl.databases.put(newDatabase.dbName, newDatabase); + } else { + if (ctl.databases.containsKey(event.getDbName())) { + ctl.databases.get(event.getDbName()).tables.clear(); + } + } + break; + + case CREATE_TABLE: + if (ctl.databases.containsKey(event.getDbName())) { + MockTable tbl = new MockTable(event.getTblName()); + ctl.databases.get(event.getDbName()).tables.put(event.getTblName(), tbl); + } + break; + + case DROP_TABLE: + if (ctl.databases.containsKey(event.getDbName())) { + ctl.databases.get(event.getDbName()).tables.remove(event.getTblName()); + } + break; + + case ALTER_TABLE: + case INSERT: + if (ctl.databases.containsKey(event.getDbName())) { + if (event instanceof AlterTableEvent + && (((AlterTableEvent) event).isRename() || ((AlterTableEvent) event).isView())) { + ctl.databases.get(event.getDbName()).tables.remove(event.getTblName()); + MockTable tbl = new MockTable(((AlterTableEvent) event).getTblNameAfter()); + ctl.databases.get(event.getDbName()).tables.put(tbl.tblName, tbl); + } else { + MockTable tbl = ctl.databases.get(event.getDbName()).tables.get(event.getTblName()); + if (tbl != null) { + tbl.partitions.clear(); + tbl.refresh(); + } + } + } + break; + + case ADD_PARTITION: + if (ctl.databases.containsKey(event.getDbName())) { + MockTable tbl = ctl.databases.get(event.getDbName()).tables.get(event.getTblName()); + if (tbl != null) { + for (String partitionName : ((AddPartitionEvent) event).getAllPartitionNames()) { + MockPartition partition = new MockPartition(partitionName); + tbl.partitions.put(partitionName, partition); + } + } + } + break; + + case ALTER_PARTITION: + if (ctl.databases.containsKey(event.getDbName())) { + MockTable tbl = ctl.databases.get(event.getDbName()).tables.get(event.getTblName()); + AlterPartitionEvent alterPartitionEvent = ((AlterPartitionEvent) event); + if (tbl != null) { + if (alterPartitionEvent.isRename()) { + for (String partitionName : alterPartitionEvent.getAllPartitionNames()) { + tbl.partitions.remove(partitionName); + } + MockPartition partition = new MockPartition(alterPartitionEvent.getPartitionNameAfter()); + tbl.partitions.put(partition.partitionName, partition); + } else { + for (String partitionName : alterPartitionEvent.getAllPartitionNames()) { + MockPartition partition = tbl.partitions.get(partitionName); + if (partition != null) { + partition.refresh(); + } + } + } + } + } + break; + + case DROP_PARTITION: + if (ctl.databases.containsKey(event.getDbName())) { + MockTable tbl = ctl.databases.get(event.getDbName()).tables.get(event.getTblName()); + if (tbl != null) { + for (String partitionName : ((DropPartitionEvent) event).getAllPartitionNames()) { + tbl.partitions.remove(partitionName); + } + } + } + break; + + default: + Assertions.fail("Unknown event type : " + event.getEventType()); + } + } + + static class EventProducer { + // every type of event has a proportion + // for instance, if the `CreateDatabaseEvent`'s proportion is 1 + // and the `AlterDatabaseEvent`'s proportion is 10 + // the event count of `AlterDatabaseEvent` is always about 10 times as the `CreateDatabaseEvent` + private final List proportions; + private final int sumOfProportions; + + EventProducer(List proportions) { + Preconditions.checkArgument(CollectionUtils.isNotEmpty(proportions) + && proportions.size() == eventProducers.size()); + this.proportions = ImmutableList.copyOf(proportions); + this.sumOfProportions = proportions.stream().mapToInt(proportion -> proportion).sum(); + } + + public MetastoreEvent produceOneEvent(long eventId) { + return eventProducers.get(calIndex(random.nextInt(sumOfProportions))).apply(eventId); + } + + private int calIndex(int val) { + int currentIndex = 0; + int currentBound = proportions.get(currentIndex); + while (currentIndex < proportions.size() - 1) { + if (val > currentBound) { + currentBound += proportions.get(++currentIndex); + } else { + return currentIndex; + } + } + return proportions.size() - 1; + } + } @Test public void testCreateBatchEvents() { - AlterPartitionEvent e1 = new AlterPartitionEvent(1L, "test_ctl", "test_db", "t1", "p1", "p1"); - AlterPartitionEvent e2 = new AlterPartitionEvent(2L, "test_ctl", "test_db", "t1", "p1", "p1"); - AddPartitionEvent e3 = new AddPartitionEvent(3L, "test_ctl", "test_db", "t1", Arrays.asList("p1")); - AlterTableEvent e4 = new AlterTableEvent(4L, "test_ctl", "test_db", "t1", false, false); - AlterTableEvent e5 = new AlterTableEvent(5L, "test_ctl", "test_db", "t1", true, false); - AlterTableEvent e6 = new AlterTableEvent(6L, "test_ctl", "test_db", "t1", false, true); - DropTableEvent e7 = new DropTableEvent(7L, "test_ctl", "test_db", "t1"); - InsertEvent e8 = new InsertEvent(8L, "test_ctl", "test_db", "t1"); - CreateDatabaseEvent e9 = new CreateDatabaseEvent(9L, "test_ctl", "test_db2"); - AlterPartitionEvent e10 = new AlterPartitionEvent(10L, "test_ctl", "test_db", "t2", "p1", "p1"); - AlterTableEvent e11 = new AlterTableEvent(11L, "test_ctl", "test_db", "t1", false, false); - CreateTableEvent e12 = new CreateTableEvent(12L, "test_ctl", "test_db", "t1"); - AlterDatabaseEvent e13 = new AlterDatabaseEvent(13L, "test_ctl", "test_db", true); - AlterDatabaseEvent e14 = new AlterDatabaseEvent(14L, "test_ctl", "test_db", false); - - List mergedEvents; - List testEvents = Lists.newLinkedList(); - - testEvents.add(e1); - testEvents.add(e2); - mergedEvents = factory.createBatchEvents("test_ctl", testEvents); - Assertions.assertTrue(mergedEvents.size() == 1); - Assertions.assertTrue(mergedEvents.get(0).getEventId() == 2L); - - testEvents.clear(); - testEvents.add(e1); - testEvents.add(e2); - testEvents.add(e3); - testEvents.add(e9); - testEvents.add(e10); - testEvents.add(e4); - mergedEvents = factory.createBatchEvents("test_ctl", testEvents); - Assertions.assertTrue(mergedEvents.size() == 3); - Assertions.assertTrue(mergedEvents.get(0).getEventId() == 9L); - Assertions.assertTrue(mergedEvents.get(1).getEventId() == 10L); - Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L); - - // because e5 is a rename event, it will not be merged - testEvents.clear(); - testEvents.add(e1); - testEvents.add(e2); - testEvents.add(e10); - testEvents.add(e5); - testEvents.add(e4); - mergedEvents = factory.createBatchEvents("test_ctl", testEvents); - Assertions.assertTrue(mergedEvents.size() == 3); - Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); - Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L); - Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L); - - testEvents.clear(); - testEvents.add(e1); - testEvents.add(e2); - testEvents.add(e10); - testEvents.add(e6); - testEvents.add(e4); - mergedEvents = factory.createBatchEvents("test_ctl", testEvents); - Assertions.assertTrue(mergedEvents.size() == 3); - Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); - Assertions.assertTrue(mergedEvents.get(1).getEventId() == 6L); - Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L); - - testEvents.clear(); - testEvents.add(e1); - testEvents.add(e2); - testEvents.add(e10); - testEvents.add(e4); - testEvents.add(e11); - mergedEvents = factory.createBatchEvents("test_ctl", testEvents); - Assertions.assertTrue(mergedEvents.size() == 2); - Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); - Assertions.assertTrue(mergedEvents.get(1).getEventId() == 11L); - - testEvents.clear(); - testEvents.add(e1); - testEvents.add(e2); - testEvents.add(e10); - testEvents.add(e4); - testEvents.add(e8); - mergedEvents = factory.createBatchEvents("test_ctl", testEvents); - Assertions.assertTrue(mergedEvents.size() == 2); - Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); - Assertions.assertTrue(mergedEvents.get(1).getEventId() == 8L); - - // because e5 is a rename event, it will not be merged - testEvents.clear(); - testEvents.add(e1); - testEvents.add(e2); - testEvents.add(e10); - testEvents.add(e5); - testEvents.add(e8); - mergedEvents = factory.createBatchEvents("test_ctl", testEvents); - Assertions.assertTrue(mergedEvents.size() == 3); - Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); - Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L); - Assertions.assertTrue(mergedEvents.get(2).getEventId() == 8L); - - testEvents.clear(); - testEvents.add(e1); - testEvents.add(e2); - testEvents.add(e10); - testEvents.add(e12); - testEvents.add(e4); - testEvents.add(e7); - mergedEvents = factory.createBatchEvents("test_ctl", testEvents); - Assertions.assertTrue(mergedEvents.size() == 2); - Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); - Assertions.assertTrue(mergedEvents.get(1).getEventId() == 7L); - - // because e5 is a rename event, it will not be merged - testEvents.clear(); - testEvents.add(e1); - testEvents.add(e2); - testEvents.add(e10); - testEvents.add(e5); - testEvents.add(e7); - mergedEvents = factory.createBatchEvents("test_ctl", testEvents); - Assertions.assertTrue(mergedEvents.size() == 3); - Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); - Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L); - Assertions.assertTrue(mergedEvents.get(2).getEventId() == 7L); - - testEvents.clear(); - testEvents.add(e1); - testEvents.add(e2); - testEvents.add(e10); - testEvents.add(e4); - testEvents.add(e13); - testEvents.add(e7); - mergedEvents = factory.createBatchEvents("test_ctl", testEvents); - Assertions.assertTrue(mergedEvents.size() == 4); - Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); - Assertions.assertTrue(mergedEvents.get(1).getEventId() == 4L); - Assertions.assertTrue(mergedEvents.get(2).getEventId() == 13L); - Assertions.assertTrue(mergedEvents.get(3).getEventId() == 7L); - - testEvents.clear(); - testEvents.add(e1); - testEvents.add(e2); - testEvents.add(e10); - testEvents.add(e4); - testEvents.add(e14); - testEvents.add(e7); - mergedEvents = factory.createBatchEvents("test_ctl", testEvents); - Assertions.assertTrue(mergedEvents.size() == 3); - Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L); - Assertions.assertTrue(mergedEvents.get(1).getEventId() == 14L); - Assertions.assertTrue(mergedEvents.get(2).getEventId() == 7L); + // for catalog initialization, so just produce CreateXXXEvent / AddXXXEvent + List initProportions = Lists.newArrayList( + 1, // CreateDatabaseEvent + 0, // AlterDatabaseEvent + 0, // DropDatabaseEvent + 10, // CreateTableEvent + 0, // AlterTableEvent + 0, // InsertEvent + 0, // DropTableEvent + 100, // AddPartitionEvent + 0, // AlterPartitionEvent + 0 // DropPartitionEvent + ); + + List proportions = Lists.newArrayList( + 5, // CreateDatabaseEvent + 1, // AlterDatabaseEvent + 5, // DropDatabaseEvent + 100, // CreateTableEvent + 20000, // AlterTableEvent + 2000, // InsertEvent + 5000, // DropTableEvent + 10000, // AddPartitionEvent + 50000, // AlterPartitionEvent + 20000 // DropPartitionEvent + ); + EventProducer initProducer = new EventProducer(initProportions); + EventProducer producer = new EventProducer(proportions); + + for (int i = 0; i < 200; i++) { + // create a test catalog and do initialization + MockCatalog testCatalog = new MockCatalog(testCtl); + List initEvents = Lists.newArrayListWithCapacity(1000); + for (int j = 0; j < 1000; j++) { + initEvents.add(initProducer.produceOneEvent(j)); + } + for (MetastoreEvent event : initEvents) { + processEvent(testCatalog, event); + } + + // copy the test catalog to the validate catalog + MockCatalog validateCatalog = testCatalog.copy(); + + List events = Lists.newArrayListWithCapacity(1000); + for (int j = 0; j < 1000; j++) { + events.add(producer.produceOneEvent(j)); + } + List mergedEvents = factory.createBatchEvents(testCtl, events); + + for (MetastoreEvent event : events) { + processEvent(validateCatalog, event); + } + + for (MetastoreEvent event : mergedEvents) { + processEvent(testCatalog, event); + } + + // the test catalog should be equals to the validate catalog + // otherwise we must have some bugs at `factory.createBatchEvents()` + Assertions.assertEquals(testCatalog, validateCatalog); + } } }