Skip to content

Commit

Permalink
[Enhancement](multi-catalog) merge hms partition events.
Browse files Browse the repository at this point in the history
  • Loading branch information
王翔宇 committed Aug 12, 2023
1 parent 154c257 commit b6ae374
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public Set<String> getAllPartitionNames() {
return ImmutableSet.copyOf(partitionNames);
}

public void skipOnePartition(String partitionName) {
public void removePartition(String partitionName) {
partitionNames.remove(partitionName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;

import java.util.List;
import java.util.Random;

/**
* MetastoreEvent for ALTER_DATABASE event type
Expand All @@ -49,7 +50,7 @@ public AlterDatabaseEvent(long eventId, String catalogName, String dbName, boole
this.isRename = isRename;
this.dbBefore = null;
this.dbAfter = null;
this.dbNameAfter = isRename ? (dbName + "_new") : dbName;
this.dbNameAfter = isRename ? (dbName + new Random().nextInt(10)) : dbName;
}

private AlterDatabaseEvent(NotificationEvent event,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;

import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -51,7 +52,7 @@ public AlterPartitionEvent(long eventId, String catalogName, String dbName, Stri
String partitionNameBefore, boolean isRename) {
super(eventId, catalogName, dbName, tblName, MetastoreEventType.ALTER_PARTITION);
this.partitionNameBefore = partitionNameBefore;
this.partitionNameAfter = isRename ? (partitionNameBefore + "_new") : partitionNameBefore;
this.partitionNameAfter = isRename ? (partitionNameBefore + new Random().nextInt(100)) : partitionNameBefore;
this.hmsTbl = null;
this.partitionAfter = null;
this.partitionBefore = null;
Expand Down Expand Up @@ -140,9 +141,9 @@ protected boolean canBeBatched(MetastoreEvent that) {

for (String partitionName : getAllPartitionNames()) {
if (thatPartitionEvent instanceof AddPartitionEvent) {
((AddPartitionEvent) thatPartitionEvent).skipOnePartition(partitionName);
((AddPartitionEvent) thatPartitionEvent).removePartition(partitionName);
} else if (thatPartitionEvent instanceof DropPartitionEvent) {
((DropPartitionEvent) thatPartitionEvent).skipOnePartition(partitionName);
((DropPartitionEvent) thatPartitionEvent).removePartition(partitionName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterTableMessage;

import java.util.List;
import java.util.Random;

/**
* MetastoreEvent for ALTER_TABLE event type
Expand All @@ -51,7 +52,7 @@ public AlterTableEvent(long eventId, String catalogName, String dbName,
this.isView = isView;
this.tableBefore = null;
this.tableAfter = null;
this.tblNameAfter = isRename ? (tblName + "_new") : tblName;
this.tblNameAfter = isRename ? (tblName + new Random().nextInt(10)) : tblName;
}

private AlterTableEvent(NotificationEvent event, String catalogName) {
Expand Down Expand Up @@ -169,7 +170,6 @@ protected boolean canBeBatched(MetastoreEvent that) {
// that event must be a MetastoreTableEvent event
// otherwise `isSameTable` will return false
MetastoreTableEvent thatTblEvent = (MetastoreTableEvent) that;

if (thatTblEvent.willChangeTableName()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public Set<String> getAllPartitionNames() {
return ImmutableSet.copyOf(partitionNames);
}

public void skipOnePartition(String partitionName) {
public void removePartition(String partitionName) {
partitionNames.remove(partitionName);
}

Expand Down Expand Up @@ -123,9 +123,9 @@ protected boolean canBeBatched(MetastoreEvent that) {

for (String partitionName : getAllPartitionNames()) {
if (thatPartitionEvent instanceof AddPartitionEvent) {
((AddPartitionEvent) thatPartitionEvent).skipOnePartition(partitionName);
((AddPartitionEvent) thatPartitionEvent).removePartition(partitionName);
} else if (thatPartitionEvent instanceof DropPartitionEvent) {
((DropPartitionEvent) thatPartitionEvent).skipOnePartition(partitionName);
((DropPartitionEvent) thatPartitionEvent).removePartition(partitionName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class MetastoreEventFactoryTest {

private static final Function<Long, AlterTableEvent> alterTableEventProducer = eventId
-> new AlterTableEvent(eventId, testCtl, randomDb(), randomTbl(),
randomBool(0.001D), randomBool(0.01D));
randomBool(0.01D), randomBool(0.01D));

private static final Function<Long, InsertEvent> insertEventProducer = eventId
-> new InsertEvent(eventId, testCtl, randomDb(), randomTbl());
Expand All @@ -82,7 +82,7 @@ public class MetastoreEventFactoryTest {

private static final Function<Long, AlterPartitionEvent> alterPartitionEventProducer = eventId
-> new AlterPartitionEvent(eventId, testCtl, randomDb(), randomTbl(), randomPartition(),
randomBool(0.001D));
randomBool(0.01D));

private static final Function<Long, DropPartitionEvent> dropPartitionEventProducer = eventId
-> new DropPartitionEvent(eventId, testCtl, randomDb(), randomTbl(), randomPartitions());
Expand All @@ -93,15 +93,15 @@ public class MetastoreEventFactoryTest {
addPartitionEventProducer, alterPartitionEventProducer, dropPartitionEventProducer);

private static String randomDb() {
return "db_" + random.nextInt(5);
return "db_" + random.nextInt(10);
}

private static String randomTbl() {
return "tbl_" + random.nextInt(10);
return "tbl_" + random.nextInt(100);
}

private static String randomPartition() {
return "partition_" + random.nextInt(100);
return "partition_" + random.nextInt(1000);
}

private static List<String> randomPartitions() {
Expand All @@ -119,6 +119,7 @@ private static boolean randomBool(double possibility) {
return random.nextInt(1000000) <= upperBound;
}

// define MockCatalog/MockDatabase/MockTable/MockPartition to simulate the real catalog/database/table/partition
private static class MockCatalog {
private String ctlName;
private Map<String, MockDatabase> databases = Maps.newHashMap();
Expand Down Expand Up @@ -193,15 +194,21 @@ public MockDatabase copy() {

private static class MockTable {
private String tblName;
// use this filed to mark if the table has been refreshed
private boolean refreshed;
private Map<String, MockPartition> partitions = Maps.newHashMap();

private MockTable(String tblName) {
this.tblName = tblName;
}

public void refresh() {
this.refreshed = true;
}

@Override
public int hashCode() {
return 31 * Objects.hash(tblName) + Arrays.hashCode(
return 31 * Objects.hash(tblName, refreshed) + Arrays.hashCode(
partitions.values().stream().sorted(Comparator.comparing(p -> p.partitionName)).toArray());
}

Expand All @@ -213,6 +220,9 @@ public boolean equals(Object other) {
if (!Objects.equals(this.tblName, ((MockTable) other).tblName)) {
return false;
}
if (refreshed != ((MockTable) other).refreshed) {
return false;
}
Object[] sortedPartitions = partitions.values().stream()
.sorted(Comparator.comparing(p -> p.partitionName)).toArray();
Object[] otherSortedPartitions = ((MockTable) other).partitions.values().stream()
Expand All @@ -229,6 +239,7 @@ public MockTable copy() {

private static class MockPartition {
private String partitionName;
// use this filed to mark if the partition has been refreshed
private boolean refreshed;

private MockPartition(String partitionName) {
Expand All @@ -253,6 +264,7 @@ public boolean equals(Object other) {
}
}

// simulate the processes when handling hms events
private void processEvent(MockCatalog ctl, MetastoreEvent event) {
switch (event.getEventType()) {

Expand Down Expand Up @@ -294,14 +306,16 @@ private void processEvent(MockCatalog ctl, MetastoreEvent event) {
case ALTER_TABLE:
case INSERT:
if (ctl.databases.containsKey(event.getDbName())) {
if (event instanceof AlterTableEvent && ((AlterTableEvent) event).isRename()) {
if (event instanceof AlterTableEvent
&& (((AlterTableEvent) event).isRename() || ((AlterTableEvent) event).isView())) {
ctl.databases.get(event.getDbName()).tables.remove(event.getTblName());
MockTable tbl = new MockTable(((AlterTableEvent) event).getTblNameAfter());
ctl.databases.get(event.getDbName()).tables.put(tbl.tblName, tbl);
} else {
MockTable tbl = ctl.databases.get(event.getDbName()).tables.get(event.getTblName());
if (tbl != null) {
tbl.partitions.clear();
tbl.refresh();
}
}
}
Expand Down Expand Up @@ -359,18 +373,22 @@ private void processEvent(MockCatalog ctl, MetastoreEvent event) {
}

static class EventProducer {
// every type of event has a proportion
// for instance, if the `CreateDatabaseEvent`'s proportion is 1
// and the `AlterDatabaseEvent`'s proportion is 10
// the event count of `AlterDatabaseEvent` is always about 10 times as the `CreateDatabaseEvent`
private final List<Integer> proportions;
private final int sumProportion;
private final int sumOfProportions;

EventProducer(List<Integer> proportions) {
Preconditions.checkArgument(CollectionUtils.isNotEmpty(proportions)
&& proportions.size() == eventProducers.size());
this.proportions = ImmutableList.copyOf(proportions);
this.sumProportion = proportions.stream().mapToInt(proportion -> proportion).sum();
this.sumOfProportions = proportions.stream().mapToInt(proportion -> proportion).sum();
}

public MetastoreEvent produceOneEvent(long eventId) {
return eventProducers.get(calIndex(random.nextInt(sumProportion))).apply(eventId);
return eventProducers.get(calIndex(random.nextInt(sumOfProportions))).apply(eventId);
}

private int calIndex(int val) {
Expand All @@ -389,37 +407,66 @@ private int calIndex(int val) {

@Test
public void testCreateBatchEvents() {
// for catalog initialization, so just produce CreateXXXEvent / AddXXXEvent
List<Integer> initProportions = Lists.newArrayList(
1, // CreateDatabaseEvent
0, // AlterDatabaseEvent
0, // DropDatabaseEvent
10, // CreateTableEvent
0, // AlterTableEvent
0, // InsertEvent
0, // DropTableEvent
100, // AddPartitionEvent
0, // AlterPartitionEvent
0 // DropPartitionEvent
);

List<Integer> proportions = Lists.newArrayList(
5, // createDatabaseEvent 1
1, // alterDatabaseEvent
5, // dropDatabaseEvent
100, // createTableEvent
1000, // alterTableEvent
1000, // insertEvent
1000, // dropTableEvent
10000, // addPartitionEvent
50000, // alterPartitionEvent
10000 // dropPartitionEvent
5, // CreateDatabaseEvent
1, // AlterDatabaseEvent
5, // DropDatabaseEvent
100, // CreateTableEvent
20000, // AlterTableEvent
2000, // InsertEvent
5000, // DropTableEvent
10000, // AddPartitionEvent
50000, // AlterPartitionEvent
20000 // DropPartitionEvent
);
EventProducer initProducer = new EventProducer(initProportions);
EventProducer producer = new EventProducer(proportions);
for (int i = 0; i < 1000; i++) {

for (int i = 0; i < 100; i++) {
// create a test catalog and do initialization
MockCatalog testCatalog = new MockCatalog(testCtl);
MockCatalog verifyCatalog = testCatalog.copy();
List<MetastoreEvent> initEvents = Lists.newArrayListWithCapacity(1000);
for (int j = 0; j < 10000; j++) {
initEvents.add(initProducer.produceOneEvent(j));
}
for (MetastoreEvent event : initEvents) {
processEvent(testCatalog, event);
}

// copy the test catalog to the validate catalog
MockCatalog validateCatalog = testCatalog.copy();

List<MetastoreEvent> events = Lists.newArrayListWithCapacity(1000);
for (int j = 0; j < 1000; j++) {
for (int j = 0; j < 5000; j++) {
events.add(producer.produceOneEvent(j));
}
List<MetastoreEvent> mergedEvents = factory.createBatchEvents(testCtl, events);

for (MetastoreEvent event : events) {
processEvent(verifyCatalog, event);
processEvent(validateCatalog, event);
}

for (MetastoreEvent event : mergedEvents) {
processEvent(testCatalog, event);
}

Assertions.assertEquals(testCatalog, verifyCatalog);
// the test catalog should be equals to the validate catalog
// otherwise we must have some bugs at `factory.createBatchEvents()`
Assertions.assertEquals(testCatalog, validateCatalog);
}
}
}

0 comments on commit b6ae374

Please sign in to comment.