Skip to content

Commit

Permalink
[Enhancement](multi-catalog) merge hms partition events. (#22869)
Browse files Browse the repository at this point in the history
This pr mainly has two changes:

1. add some merge processes about partition events
2. add a ut for `MetastoreEventFactory`. First add some mock classes (`MockCatalog`/`MockDatabase` ...) to simulate the real hms catalog/databases/tables/partitions,  then create a event producer which can produce every kinds of `MetastoreEvent`s randomly. Use two catalogs for test, one is named `testCatalog` and the other is the `validateCatalog`, use event producer to produce many events and let `validateCatalog` to handle all of the events, but `testCatalog` just handles the events  which have been merged by `MetastoreEventFactory`, check if the `validateCatalog` is equals to `testCatalog`.
  • Loading branch information
dutyu authored and xiaokang committed Sep 10, 2023
1 parent 0a45639 commit 9ebe3cd
Show file tree
Hide file tree
Showing 15 changed files with 653 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.common.DdlException;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand All @@ -32,6 +33,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand All @@ -44,7 +46,7 @@ public class AddPartitionEvent extends MetastorePartitionEvent {
// for test
public AddPartitionEvent(long eventId, String catalogName, String dbName,
String tblName, List<String> partitionNames) {
super(eventId, catalogName, dbName, tblName);
super(eventId, catalogName, dbName, tblName, MetastoreEventType.ADD_PARTITION);
this.partitionNames = partitionNames;
this.hmsTbl = null;
}
Expand All @@ -71,6 +73,20 @@ private AddPartitionEvent(NotificationEvent event,
}
}

@Override
protected boolean willChangePartitionName() {
return false;
}

@Override
public Set<String> getAllPartitionNames() {
return ImmutableSet.copyOf(partitionNames);
}

public void removePartition(String partitionName) {
partitionNames.remove(partitionName);
}

protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(new AddPartitionEvent(event, catalogName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;

import java.util.List;
import java.util.Random;

/**
* MetastoreEvent for ALTER_DATABASE event type
Expand All @@ -41,13 +42,15 @@ public class AlterDatabaseEvent extends MetastoreEvent {

// true if this alter event was due to a rename operation
private final boolean isRename;
private final String dbNameAfter;

// for test
public AlterDatabaseEvent(long eventId, String catalogName, String dbName, boolean isRename) {
super(eventId, catalogName, dbName, null);
super(eventId, catalogName, dbName, null, MetastoreEventType.ALTER_DATABASE);
this.isRename = isRename;
this.dbBefore = null;
this.dbAfter = null;
this.dbNameAfter = isRename ? (dbName + new Random().nextInt(10)) : dbName;
}

private AlterDatabaseEvent(NotificationEvent event,
Expand All @@ -61,6 +64,7 @@ private AlterDatabaseEvent(NotificationEvent event,
.getAlterDatabaseMessage(event.getMessage());
dbBefore = Preconditions.checkNotNull(alterDatabaseMessage.getDbObjBefore());
dbAfter = Preconditions.checkNotNull(alterDatabaseMessage.getDbObjAfter());
dbNameAfter = dbAfter.getName();
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Unable to parse the alter database message"), e);
Expand Down Expand Up @@ -97,6 +101,10 @@ public boolean isRename() {
return isRename;
}

public String getDbNameAfter() {
return dbNameAfter;
}

@Override
protected void process() throws MetastoreNotificationException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.common.DdlException;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand All @@ -30,7 +31,8 @@
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;

import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand All @@ -47,14 +49,14 @@ public class AlterPartitionEvent extends MetastorePartitionEvent {

// for test
public AlterPartitionEvent(long eventId, String catalogName, String dbName, String tblName,
String partitionNameBefore, String partitionNameAfter) {
super(eventId, catalogName, dbName, tblName);
String partitionNameBefore, boolean isRename) {
super(eventId, catalogName, dbName, tblName, MetastoreEventType.ALTER_PARTITION);
this.partitionNameBefore = partitionNameBefore;
this.partitionNameAfter = partitionNameAfter;
this.partitionNameAfter = isRename ? (partitionNameBefore + new Random().nextInt(100)) : partitionNameBefore;
this.hmsTbl = null;
this.partitionAfter = null;
this.partitionBefore = null;
isRename = !partitionNameBefore.equalsIgnoreCase(partitionNameAfter);
this.isRename = isRename;
}

private AlterPartitionEvent(NotificationEvent event,
Expand All @@ -80,6 +82,24 @@ private AlterPartitionEvent(NotificationEvent event,
}
}

@Override
protected boolean willChangePartitionName() {
return isRename;
}

@Override
public Set<String> getAllPartitionNames() {
return ImmutableSet.of(partitionNameBefore);
}

public String getPartitionNameAfter() {
return partitionNameAfter;
}

public boolean isRename() {
return isRename;
}

protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
return Lists.newArrayList(new AlterPartitionEvent(event, catalogName));
Expand Down Expand Up @@ -109,10 +129,28 @@ protected void process() throws MetastoreNotificationException {
}

@Override
protected boolean canBeBatched(MetastoreEvent event) {
return isSameTable(event)
&& event instanceof AlterPartitionEvent
&& Objects.equals(partitionBefore, ((AlterPartitionEvent) event).partitionBefore)
&& Objects.equals(partitionAfter, ((AlterPartitionEvent) event).partitionAfter);
protected boolean canBeBatched(MetastoreEvent that) {
if (!isSameTable(that) || !(that instanceof MetastorePartitionEvent)) {
return false;
}

// Check if `that` event is a rename event, a rename event can not be batched
// because the process of `that` event will change the reference relation of this partition
MetastorePartitionEvent thatPartitionEvent = (MetastorePartitionEvent) that;
if (thatPartitionEvent.willChangePartitionName()) {
return false;
}

// `that` event can be batched if this event's partitions contains all of the partitions which `that` event has
// else just remove `that` event's relevant partitions
for (String partitionName : getAllPartitionNames()) {
if (thatPartitionEvent instanceof AddPartitionEvent) {
((AddPartitionEvent) thatPartitionEvent).removePartition(partitionName);
} else if (thatPartitionEvent instanceof DropPartitionEvent) {
((DropPartitionEvent) thatPartitionEvent).removePartition(partitionName);
}
}

return getAllPartitionNames().containsAll(thatPartitionEvent.getAllPartitionNames());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;

import java.util.List;
import java.util.Random;

/**
* MetastoreEvent for ALTER_TABLE event type
Expand All @@ -41,17 +42,17 @@ public class AlterTableEvent extends MetastoreTableEvent {
// true if this alter event was due to a rename operation
private final boolean isRename;
private final boolean isView;
private final boolean willCreateOrDropTable;
private final String tblNameAfter;

// for test
public AlterTableEvent(long eventId, String catalogName, String dbName,
String tblName, boolean isRename, boolean isView) {
super(eventId, catalogName, dbName, tblName);
super(eventId, catalogName, dbName, tblName, MetastoreEventType.ALTER_TABLE);
this.isRename = isRename;
this.isView = isView;
this.tableBefore = null;
this.tableAfter = null;
this.willCreateOrDropTable = isRename || isView;
this.tblNameAfter = isRename ? (tblName + new Random().nextInt(10)) : tblName;
}

private AlterTableEvent(NotificationEvent event, String catalogName) {
Expand All @@ -65,6 +66,7 @@ private AlterTableEvent(NotificationEvent event, String catalogName) {
.getAlterTableMessage(event.getMessage());
tableAfter = Preconditions.checkNotNull(alterTableMessage.getTableObjAfter());
tableBefore = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore());
tblNameAfter = tableAfter.getTableName();
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Unable to parse the alter table message"), e);
Expand All @@ -73,7 +75,6 @@ private AlterTableEvent(NotificationEvent event, String catalogName) {
isRename = !tableBefore.getDbName().equalsIgnoreCase(tableAfter.getDbName())
|| !tableBefore.getTableName().equalsIgnoreCase(tableAfter.getTableName());
isView = tableBefore.isSetViewExpandedText() || tableBefore.isSetViewOriginalText();
this.willCreateOrDropTable = isRename || isView;
}

public static List<MetastoreEvent> getEvents(NotificationEvent event,
Expand All @@ -83,7 +84,12 @@ public static List<MetastoreEvent> getEvents(NotificationEvent event,

@Override
protected boolean willCreateOrDropTable() {
return willCreateOrDropTable;
return isRename || isView;
}

@Override
protected boolean willChangeTableName() {
return isRename;
}

private void processRecreateTable() throws DdlException {
Expand Down Expand Up @@ -123,6 +129,10 @@ public boolean isView() {
return isView;
}

public String getTblNameAfter() {
return tblNameAfter;
}

/**
* If the ALTER_TABLE event is due a table rename, this method removes the old table
* and creates a new table with the new name. Else, we just refresh table
Expand Down Expand Up @@ -157,15 +167,22 @@ protected boolean canBeBatched(MetastoreEvent that) {
return false;
}

// `that` event must not be a rename table event
// so if the process of this event will drop this table,
// it can merge all the table's events before
if (willCreateOrDropTable) {
// First check if `that` event is a rename event, a rename event can not be batched
// because the process of `that` event will change the reference relation of this table
// `that` event must be a MetastoreTableEvent event otherwise `isSameTable` will return false
MetastoreTableEvent thatTblEvent = (MetastoreTableEvent) that;
if (thatTblEvent.willChangeTableName()) {
return false;
}

// Then check if the process of this event will create or drop this table,
// if true then `that` event can be batched
if (willCreateOrDropTable()) {
return true;
}

// that event must be a MetastoreTableEvent event
// otherwise `isSameTable` will return false
return !((MetastoreTableEvent) that).willCreateOrDropTable();
// Last, check if the process of `that` event will create or drop this table
// if false then `that` event can be batched
return !thatTblEvent.willCreateOrDropTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class CreateDatabaseEvent extends MetastoreEvent {

// for test
public CreateDatabaseEvent(long eventId, String catalogName, String dbName) {
super(eventId, catalogName, dbName, null);
super(eventId, catalogName, dbName, null, MetastoreEventType.CREATE_DATABASE);
}

private CreateDatabaseEvent(NotificationEvent event,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class CreateTableEvent extends MetastoreTableEvent {

// for test
public CreateTableEvent(long eventId, String catalogName, String dbName, String tblName) {
super(eventId, catalogName, dbName, tblName);
super(eventId, catalogName, dbName, tblName, MetastoreEventType.CREATE_TABLE);
this.hmsTbl = null;
}

Expand Down Expand Up @@ -66,6 +66,11 @@ protected boolean willCreateOrDropTable() {
return true;
}

@Override
protected boolean willChangeTableName() {
return false;
}

@Override
protected void process() throws MetastoreNotificationException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
*/
public class DropDatabaseEvent extends MetastoreEvent {

// for test
public DropDatabaseEvent(long eventId, String catalogName, String dbName) {
super(eventId, catalogName, dbName, null, MetastoreEventType.DROP_DATABASE);
}

private DropDatabaseEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Expand Down
Loading

0 comments on commit 9ebe3cd

Please sign in to comment.