-
Notifications
You must be signed in to change notification settings - Fork 2.5k
chore(ci): Hudi-utilities test improvements #17758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b13d578
57b418f
157de4f
2c5cfca
21f6279
8f13b37
0e21622
6dfb53d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -244,10 +244,5 @@ | |
| <artifactId>testcontainers</artifactId> | ||
| <scope>compile</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.testcontainers</groupId> | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved this to be where it is required |
||
| <artifactId>localstack</artifactId> | ||
| <scope>compile</scope> | ||
| </dependency> | ||
| </dependencies> | ||
| </project> | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -459,12 +459,19 @@ private static String resetTarget(Config configuration, String database, String | |
| */ | ||
| public void sync() { | ||
| for (TableExecutionContext context : tableExecutionContexts) { | ||
| HoodieStreamer streamer = null; | ||
| try { | ||
| new HoodieStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync(); | ||
| streamer = new HoodieStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())); | ||
| streamer.sync(); | ||
| successTables.add(Helpers.getTableWithDatabase(context)); | ||
| streamer.shutdownGracefully(); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure the instances are shutdown properly
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor. should we move this to finally block?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, makes sense |
||
| } catch (Exception e) { | ||
| LOG.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e); | ||
| failedTables.add(Helpers.getTableWithDatabase(context)); | ||
| } finally { | ||
| if (streamer != null) { | ||
| streamer.shutdownGracefully(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,7 @@ | |
| import org.apache.hudi.utilities.sources.TestDataSource; | ||
| import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch; | ||
| import org.apache.hudi.utilities.streamer.HoodieStreamer; | ||
| import org.apache.hudi.utilities.testutils.KafkaTestUtils; | ||
| import org.apache.hudi.utilities.testutils.UtilitiesTestBase; | ||
|
|
||
| import org.apache.avro.Schema; | ||
|
|
@@ -62,7 +63,6 @@ | |
| import org.apache.kafka.common.serialization.ByteArrayDeserializer; | ||
| import org.apache.spark.sql.Row; | ||
| import org.apache.spark.sql.SQLContext; | ||
| import org.apache.spark.streaming.kafka010.KafkaTestUtils; | ||
| import org.junit.jupiter.api.AfterAll; | ||
| import org.junit.jupiter.api.AfterEach; | ||
| import org.junit.jupiter.api.BeforeAll; | ||
|
|
@@ -140,15 +140,12 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { | |
| protected static String topicName; | ||
| protected static String defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName(); | ||
| protected static int testNum = 1; | ||
|
|
||
| Map<String, String> hudiOpts = new HashMap<>(); | ||
| public KafkaTestUtils testUtils; | ||
| protected static KafkaTestUtils testUtils; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tests are now only setting up kafka once to cut down on the overhead |
||
| protected Map<String, String> hudiOpts = new HashMap<>(); | ||
|
|
||
| @BeforeEach | ||
| protected void prepareTestSetup() throws IOException { | ||
| setupTest(); | ||
| testUtils = new KafkaTestUtils(); | ||
| testUtils.setup(); | ||
| topicName = "topic" + testNum; | ||
| prepareInitialConfigs(storage, basePath, testUtils.brokerAddress()); | ||
| // reset TestDataSource recordInstantTime which may be set by any other test | ||
|
|
@@ -157,13 +154,10 @@ protected void prepareTestSetup() throws IOException { | |
|
|
||
| @AfterEach | ||
| public void cleanupKafkaTestUtils() { | ||
| if (testUtils != null) { | ||
| testUtils.teardown(); | ||
| testUtils = null; | ||
| } | ||
| if (hudiOpts != null) { | ||
| hudiOpts = null; | ||
| } | ||
| testUtils.deleteTopics(); | ||
| } | ||
|
|
||
| @BeforeAll | ||
|
|
@@ -173,10 +167,14 @@ public static void initClass() throws Exception { | |
| PARQUET_SOURCE_ROOT = basePath + "parquetFiles"; | ||
| ORC_SOURCE_ROOT = basePath + "orcFiles"; | ||
| JSON_KAFKA_SOURCE_ROOT = basePath + "jsonKafkaFiles"; | ||
| testUtils = new KafkaTestUtils().setup(); | ||
| } | ||
|
|
||
| @AfterAll | ||
| public static void tearDown() { | ||
| if (testUtils != null) { | ||
| testUtils.teardown(); | ||
| } | ||
| UtilitiesTestBase.cleanUpUtilitiesTestServices(); | ||
| } | ||
|
|
||
|
|
@@ -704,31 +702,31 @@ static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, S | |
| static void assertAtleastNCompactionCommits(int minExpected, String tablePath) { | ||
| HoodieTableMetaClient meta = createMetaClient(storage, tablePath); | ||
| HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); | ||
| LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); | ||
| LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants()); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed logging to avoid the toString calls while I was updating this file |
||
| int numCompactionCommits = timeline.countInstants(); | ||
| assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected); | ||
| } | ||
|
|
||
| static void assertAtleastNDeltaCommits(int minExpected, String tablePath) { | ||
| HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath); | ||
| HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants(); | ||
| LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); | ||
| LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants()); | ||
| int numDeltaCommits = timeline.countInstants(); | ||
| assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); | ||
| } | ||
|
|
||
| static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath) { | ||
| HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath); | ||
| HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); | ||
| LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); | ||
| LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants()); | ||
| int numCompactionCommits = timeline.countInstants(); | ||
| assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected); | ||
| } | ||
|
|
||
| static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath) { | ||
| HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath); | ||
| HoodieTimeline timeline = meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); | ||
| LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); | ||
| LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants()); | ||
| int numDeltaCommits = timeline.countInstants(); | ||
| assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); | ||
| } | ||
|
|
@@ -783,55 +781,55 @@ static void waitFor(BooleanSupplier booleanSupplier) { | |
| static void assertAtLeastNCommits(int minExpected, String tablePath) { | ||
| HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath); | ||
| HoodieTimeline timeline = meta.getActiveTimeline().filterCompletedInstants(); | ||
| LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); | ||
| LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants()); | ||
| int numDeltaCommits = timeline.countInstants(); | ||
| assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); | ||
| } | ||
|
|
||
| static void assertAtLeastNReplaceCommits(int minExpected, String tablePath) { | ||
| HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath); | ||
| HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline(); | ||
| LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); | ||
| LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants()); | ||
| int numDeltaCommits = timeline.countInstants(); | ||
| assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); | ||
| } | ||
|
|
||
| static void assertPendingIndexCommit(String tablePath) { | ||
| HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath); | ||
| HoodieTimeline timeline = meta.reloadActiveTimeline().getAllCommitsTimeline().filterPendingIndexTimeline(); | ||
| LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); | ||
| LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants()); | ||
| int numIndexCommits = timeline.countInstants(); | ||
| assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1"); | ||
| } | ||
|
|
||
| static void assertCompletedIndexCommit(String tablePath) { | ||
| HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath); | ||
| HoodieTimeline timeline = meta.reloadActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline(); | ||
| LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); | ||
| LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants()); | ||
| int numIndexCommits = timeline.countInstants(); | ||
| assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1"); | ||
| } | ||
|
|
||
| static void assertNoReplaceCommits(String tablePath) { | ||
| HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath); | ||
| HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline(); | ||
| LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); | ||
| LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants()); | ||
| int numDeltaCommits = timeline.countInstants(); | ||
| assertEquals(0, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + 0); | ||
| } | ||
|
|
||
| static void assertAtLeastNClusterRequests(int minExpected, String tablePath) { | ||
| HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath); | ||
| HoodieTimeline timeline = meta.getActiveTimeline().filterPendingClusteringTimeline(); | ||
| LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); | ||
| LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants()); | ||
| int numDeltaCommits = timeline.countInstants(); | ||
| assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); | ||
| } | ||
|
|
||
| static void assertAtLeastNCommitsAfterRollback(int minExpectedRollback, int minExpectedCommits, String tablePath) { | ||
| HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath); | ||
| HoodieTimeline timeline = meta.getActiveTimeline().getRollbackTimeline().filterCompletedInstants(); | ||
| LOG.info("Rollback Timeline Instants=" + meta.getActiveTimeline().getInstants()); | ||
| LOG.info("Rollback Timeline Instants={}", meta.getActiveTimeline().getInstants()); | ||
| int numRollbackCommits = timeline.countInstants(); | ||
| assertTrue(minExpectedRollback <= numRollbackCommits, "Got=" + numRollbackCommits + ", exp >=" + minExpectedRollback); | ||
| HoodieInstant firstRollback = timeline.getInstants().get(0); | ||
|
|
@@ -842,4 +840,22 @@ static void assertAtLeastNCommitsAfterRollback(int minExpectedRollback, int minE | |
| assertTrue(minExpectedCommits <= numCommits, "Got=" + numCommits + ", exp >=" + minExpectedCommits); | ||
| } | ||
| } | ||
|
|
||
| protected void syncOnce(HoodieDeltaStreamer.Config cfg) throws Exception { | ||
| HoodieStreamer streamer = new HoodieDeltaStreamer(cfg, jsc); | ||
| streamer.sync(); | ||
| streamer.shutdownGracefully(); | ||
| } | ||
|
|
||
| protected void syncOnce(HoodieStreamer streamer) throws Exception { | ||
| try { | ||
| streamer.sync(); | ||
| } finally { | ||
| streamer.shutdownGracefully(); | ||
| } | ||
| } | ||
|
|
||
| protected void syncOnce(HoodieStreamer.Config cfg, Option<TypedProperties> properties) throws Exception { | ||
| syncOnce(new HoodieStreamer(cfg, jsc, properties)); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is required for running docker in docker