diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml
index 61a5418d5348c..a458f9f6a2db8 100644
--- a/azure-pipelines-20230430.yml
+++ b/azure-pipelines-20230430.yml
@@ -126,7 +126,7 @@ stages:
jobs:
- job: UT_FT_1
displayName: UT hudi-hadoop-common & UT FT client/spark-client
- timeoutInMinutes: '90'
+ timeoutInMinutes: '120'
steps:
- task: Maven@4
displayName: maven install
@@ -180,7 +180,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_2
displayName: FTA hudi-spark
- timeoutInMinutes: '180'
+ timeoutInMinutes: '120'
steps:
- task: Maven@4
displayName: maven install
@@ -250,7 +250,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_4
displayName: UT spark-datasource Java Test 2
- timeoutInMinutes: '90'
+ timeoutInMinutes: '120'
steps:
- task: Maven@4
displayName: maven install
@@ -285,7 +285,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_5
displayName: UT spark-datasource DML
- timeoutInMinutes: '90'
+ timeoutInMinutes: '120'
steps:
- task: Maven@4
displayName: maven install
@@ -320,7 +320,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_6
displayName: UT spark-datasource DDL & Others
- timeoutInMinutes: '90'
+ timeoutInMinutes: '120'
steps:
- task: Maven@4
displayName: maven install
@@ -355,7 +355,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_7
displayName: UT Hudi Streamer & FT utilities
- timeoutInMinutes: '90'
+ timeoutInMinutes: '120'
steps:
- task: Docker@2
displayName: "login to docker hub"
@@ -378,6 +378,7 @@ stages:
command: 'run'
arguments: >
-v $(Build.SourcesDirectory):/hudi
+ -v /var/run/docker.sock:/var/run/docker.sock
-i docker.io/apachehudi/hudi-ci-bundle-validation-base:$(Build.BuildId)
/bin/bash -c "mvn clean install $(MVN_OPTS_INSTALL) -Phudi-platform-service -Pthrift-gen-source -pl hudi-utilities -am
&& mvn test $(MVN_OPTS_TEST) -Punit-tests $(JACOCO_AGENT_DESTFILE1_ARG) -Dtest="TestHoodie*" -Dsurefire.failIfNoSpecifiedTests=false -pl hudi-utilities
@@ -404,7 +405,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_8
displayName: UT FT Spark and SQL (additional)
- timeoutInMinutes: '110'
+ timeoutInMinutes: '120'
steps:
- task: Maven@4
displayName: maven install
@@ -457,7 +458,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_9
displayName: FT spark 2
- timeoutInMinutes: '90'
+ timeoutInMinutes: '120'
steps:
- task: Maven@4
displayName: maven install
@@ -492,7 +493,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_10
displayName: UT FT common & other modules
- timeoutInMinutes: '180'
+ timeoutInMinutes: '120'
steps:
- task: Docker@2
displayName: "login to docker hub"
@@ -515,6 +516,7 @@ stages:
command: 'run'
arguments: >
-v $(Build.SourcesDirectory):/hudi
+ -v /var/run/docker.sock:/var/run/docker.sock
-i docker.io/apachehudi/hudi-ci-bundle-validation-base:$(Build.BuildId)
/bin/bash -c "mvn clean install $(MVN_OPTS_INSTALL) -Phudi-platform-service -Pthrift-gen-source
&& mvn test $(MVN_OPTS_TEST) -Punit-tests -Dsurefire.failIfNoSpecifiedTests=false $(JACOCO_AGENT_DESTFILE1_ARG) -pl $(JOB10_UT_MODULES)
diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml
index a4f95fcb9f8ce..37b5a299ea83c 100644
--- a/hudi-aws/pom.xml
+++ b/hudi-aws/pom.xml
@@ -251,6 +251,11 @@
kryo-shaded
test
+
+ org.testcontainers
+ localstack
+ test
+
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
index 4e2bb4dcfe6de..7273077786f30 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
@@ -53,7 +53,7 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
TransactionManager transactionManager;
@BeforeEach
- private void init(TestInfo testInfo) throws IOException {
+ void init(TestInfo testInfo) throws IOException {
initPath();
initMetaClient();
this.writeConfig = getWriteConfig(testInfo.getTags().contains("useLockProviderWithRuntimeError"));
diff --git a/hudi-tests-common/pom.xml b/hudi-tests-common/pom.xml
index 80a35741ecfdb..6aab6f1d9c154 100644
--- a/hudi-tests-common/pom.xml
+++ b/hudi-tests-common/pom.xml
@@ -244,10 +244,5 @@
testcontainers
compile
-
- org.testcontainers
- localstack
- compile
-
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index dcb3f1729b1b9..121e20552cd0c 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -312,14 +312,6 @@
-
- org.apache.spark
- spark-streaming-kafka-0-10_${scala.binary.version}
- ${spark.version}
- tests
- test-jar
- test
-
@@ -516,6 +508,18 @@
test
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+ org.testcontainers
+ kafka
+ test
+
+
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
index 0693c3b7f1ea8..61bd18af039be 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java
@@ -459,12 +459,19 @@ private static String resetTarget(Config configuration, String database, String
*/
public void sync() {
for (TableExecutionContext context : tableExecutionContexts) {
+ HoodieStreamer streamer = null;
try {
- new HoodieStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
+ streamer = new HoodieStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties()));
+ streamer.sync();
successTables.add(Helpers.getTableWithDatabase(context));
+ streamer.shutdownGracefully();
} catch (Exception e) {
LOG.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
failedTables.add(Helpers.getTableWithDatabase(context));
+ } finally {
+ if (streamer != null) {
+ streamer.shutdownGracefully();
+ }
}
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java
index ce058e8f44777..d3d3b0703d839 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/callback/TestKafkaCallbackProvider.java
@@ -27,9 +27,9 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig;
+import org.apache.hudi.utilities.testutils.KafkaTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
-import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index ca3edb212195f..ad36b5f563b28 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -54,6 +54,7 @@
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
+import org.apache.hudi.utilities.testutils.KafkaTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.Schema;
@@ -62,7 +63,6 @@
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
-import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -140,15 +140,12 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
protected static String topicName;
protected static String defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
protected static int testNum = 1;
-
- Map hudiOpts = new HashMap<>();
- public KafkaTestUtils testUtils;
+ protected static KafkaTestUtils testUtils;
+ protected Map hudiOpts = new HashMap<>();
@BeforeEach
protected void prepareTestSetup() throws IOException {
setupTest();
- testUtils = new KafkaTestUtils();
- testUtils.setup();
topicName = "topic" + testNum;
prepareInitialConfigs(storage, basePath, testUtils.brokerAddress());
// reset TestDataSource recordInstantTime which may be set by any other test
@@ -157,13 +154,10 @@ protected void prepareTestSetup() throws IOException {
@AfterEach
public void cleanupKafkaTestUtils() {
- if (testUtils != null) {
- testUtils.teardown();
- testUtils = null;
- }
if (hudiOpts != null) {
hudiOpts = null;
}
+ testUtils.deleteTopics();
}
@BeforeAll
@@ -173,10 +167,14 @@ public static void initClass() throws Exception {
PARQUET_SOURCE_ROOT = basePath + "parquetFiles";
ORC_SOURCE_ROOT = basePath + "orcFiles";
JSON_KAFKA_SOURCE_ROOT = basePath + "jsonKafkaFiles";
+ testUtils = new KafkaTestUtils().setup();
}
@AfterAll
public static void tearDown() {
+ if (testUtils != null) {
+ testUtils.teardown();
+ }
UtilitiesTestBase.cleanUpUtilitiesTestServices();
}
@@ -704,7 +702,7 @@ static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, S
static void assertAtleastNCompactionCommits(int minExpected, String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage, tablePath);
HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();
- LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+ LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numCompactionCommits = timeline.countInstants();
assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected);
}
@@ -712,7 +710,7 @@ static void assertAtleastNCompactionCommits(int minExpected, String tablePath) {
static void assertAtleastNDeltaCommits(int minExpected, String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath);
HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
- LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+ LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numDeltaCommits = timeline.countInstants();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
}
@@ -720,7 +718,7 @@ static void assertAtleastNDeltaCommits(int minExpected, String tablePath) {
static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath);
HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
- LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+ LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numCompactionCommits = timeline.countInstants();
assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected);
}
@@ -728,7 +726,7 @@ static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String l
static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath);
HoodieTimeline timeline = meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
- LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+ LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numDeltaCommits = timeline.countInstants();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
}
@@ -783,7 +781,7 @@ static void waitFor(BooleanSupplier booleanSupplier) {
static void assertAtLeastNCommits(int minExpected, String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath);
HoodieTimeline timeline = meta.getActiveTimeline().filterCompletedInstants();
- LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+ LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numDeltaCommits = timeline.countInstants();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
}
@@ -791,7 +789,7 @@ static void assertAtLeastNCommits(int minExpected, String tablePath) {
static void assertAtLeastNReplaceCommits(int minExpected, String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath);
HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline();
- LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+ LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numDeltaCommits = timeline.countInstants();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
}
@@ -799,7 +797,7 @@ static void assertAtLeastNReplaceCommits(int minExpected, String tablePath) {
static void assertPendingIndexCommit(String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath);
HoodieTimeline timeline = meta.reloadActiveTimeline().getAllCommitsTimeline().filterPendingIndexTimeline();
- LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+ LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numIndexCommits = timeline.countInstants();
assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1");
}
@@ -807,7 +805,7 @@ static void assertPendingIndexCommit(String tablePath) {
static void assertCompletedIndexCommit(String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath);
HoodieTimeline timeline = meta.reloadActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline();
- LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+ LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numIndexCommits = timeline.countInstants();
assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1");
}
@@ -815,7 +813,7 @@ static void assertCompletedIndexCommit(String tablePath) {
static void assertNoReplaceCommits(String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath);
HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline();
- LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+ LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numDeltaCommits = timeline.countInstants();
assertEquals(0, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + 0);
}
@@ -823,7 +821,7 @@ static void assertNoReplaceCommits(String tablePath) {
static void assertAtLeastNClusterRequests(int minExpected, String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath);
HoodieTimeline timeline = meta.getActiveTimeline().filterPendingClusteringTimeline();
- LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+ LOG.info("Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numDeltaCommits = timeline.countInstants();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
}
@@ -831,7 +829,7 @@ static void assertAtLeastNClusterRequests(int minExpected, String tablePath) {
static void assertAtLeastNCommitsAfterRollback(int minExpectedRollback, int minExpectedCommits, String tablePath) {
HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath);
HoodieTimeline timeline = meta.getActiveTimeline().getRollbackTimeline().filterCompletedInstants();
- LOG.info("Rollback Timeline Instants=" + meta.getActiveTimeline().getInstants());
+ LOG.info("Rollback Timeline Instants={}", meta.getActiveTimeline().getInstants());
int numRollbackCommits = timeline.countInstants();
assertTrue(minExpectedRollback <= numRollbackCommits, "Got=" + numRollbackCommits + ", exp >=" + minExpectedRollback);
HoodieInstant firstRollback = timeline.getInstants().get(0);
@@ -842,4 +840,22 @@ static void assertAtLeastNCommitsAfterRollback(int minExpectedRollback, int minE
assertTrue(minExpectedCommits <= numCommits, "Got=" + numCommits + ", exp >=" + minExpectedCommits);
}
}
+
+ protected void syncOnce(HoodieDeltaStreamer.Config cfg) throws Exception {
+ HoodieStreamer streamer = new HoodieDeltaStreamer(cfg, jsc);
+ streamer.sync();
+ streamer.shutdownGracefully();
+ }
+
+ protected void syncOnce(HoodieStreamer streamer) throws Exception {
+ try {
+ streamer.sync();
+ } finally {
+ streamer.shutdownGracefully();
+ }
+ }
+
+ protected void syncOnce(HoodieStreamer.Config cfg, Option properties) throws Exception {
+ syncOnce(new HoodieStreamer(cfg, jsc, properties));
+ }
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 85746e3ea7f56..2f026e40d3957 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -193,6 +193,7 @@
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_PERSIST_SOURCE_RDD;
import static org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -433,11 +434,9 @@ public void testInferKeyGenerator(String propsFilename,
String expectedKeyGeneratorClassName) throws Exception {
String[] splitNames = propsFilename.split("\\.");
String tableBasePath = basePath + "/" + splitNames[0];
- HoodieDeltaStreamer deltaStreamer =
- new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
+ syncOnce(TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Collections.singletonList(TripsWithDistanceTransformer.class.getName()),
- propsFilename, false), jsc);
- deltaStreamer.sync();
+ propsFilename, false));
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(HoodieTestUtils.getDefaultStorageConf()).setBasePath(tableBasePath).build();
assertEquals(
@@ -461,9 +460,7 @@ private static void assertUseV2Checkpoint(HoodieTableMetaClient metaClient) {
public void testTableCreation() throws Exception {
Exception e = assertThrows(TableNotFoundException.class, () -> {
fs.mkdirs(new Path(basePath + "/not_a_table"));
- HoodieDeltaStreamer deltaStreamer =
- new HoodieDeltaStreamer(TestHelpers.makeConfig(basePath + "/not_a_table", WriteOperationType.BULK_INSERT), jsc);
- deltaStreamer.sync();
+ syncOnce(TestHelpers.makeConfig(basePath + "/not_a_table", WriteOperationType.BULK_INSERT));
}, "Should error out when pointed out at a dir thats not a table");
// expected
LOG.debug("Expected error during table creation", e);
@@ -483,6 +480,7 @@ public void testTableCreationContainsHiveStylePartitioningEnable(boolean configF
HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(context, tablePath);
assertEquals(configFlag, Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable()));
assertEquals(configFlag, Boolean.parseBoolean(metaClient.getTableConfig().getUrlEncodePartitioning()));
+ deltaStreamer.shutdownGracefully();
}
@ParameterizedTest
@@ -498,6 +496,7 @@ public void testPartitionKeyFieldsBasedOnVersion(HoodieTableVersion version) thr
HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(context, tablePath);
String expectedPartitionFields = version.equals(HoodieTableVersion.SIX) ? "partition_path" : "partition_path:simple";
assertEquals(expectedPartitionFields, metaClient.getTableConfig().getString(HoodieTableConfig.PARTITION_FIELDS));
+ deltaStreamer.shutdownGracefully();
}
@ParameterizedTest
@@ -539,17 +538,16 @@ public void testBulkInsertsAndUpsertsWithBootstrap(HoodieRecordType recordType)
cfg.configs.add("hoodie.bootstrap.parallelism=5");
cfg.configs.add(String.format("%s=false", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()));
cfg.targetBasePath = newDatasetBasePath;
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
Dataset res = sqlContext.read().format("org.apache.hudi").load(newDatasetBasePath);
- LOG.info("Schema :");
- res.printSchema();
+ LOG.info("Schema : {}", res.schema());
assertRecordCount(1950, newDatasetBasePath, sqlContext);
res.registerTempTable("bootstrapped");
assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key from bootstrapped").count());
// NOTE: To fetch record's count Spark will optimize the query fetching minimal possible amount
// of data, which might not provide adequate amount of test coverage
- sqlContext.sql("select * from bootstrapped").show();
+ assertDoesNotThrow(() -> sqlContext.sql("select * from bootstrapped").collect());
StructField[] fields = res.schema().fields();
List fieldNames = Arrays.asList(res.schema().fieldNames());
@@ -592,7 +590,7 @@ public void testModifiedTableConfigs() throws Exception {
}
private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, Integer expected, String tableBasePath, String metadata, Integer totalCommits) throws Exception {
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
assertRecordCount(expected, tableBasePath, sqlContext);
assertDistanceCount(expected, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata(metadata, tableBasePath, totalCommits);
@@ -615,7 +613,7 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema,
cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source.avsc");
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath));
assertRecordCount(1000, tableBasePath, sqlContext);
@@ -628,7 +626,7 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema,
cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" + basePath + "/source.avsc");
cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc");
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath));
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes.
assertRecordCount(1450, tableBasePath, sqlContext);
@@ -653,7 +651,7 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema,
cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc");
}
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
// again, 1000 new records, 500 are inserts, 450 are updates and 50 are deletes.
assertRecordCount(1900, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00002", tableBasePath, 3);
@@ -694,7 +692,7 @@ public void testTimestampMillis() throws Exception {
cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath));
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
@@ -714,7 +712,7 @@ public void testTimestampMillis() throws Exception {
cfg.configs.add(String.format("%s=%s", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath));
assertRecordCount(1450, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
@@ -755,7 +753,7 @@ public void testLogicalTypes() throws Exception {
cfg.configs.add(String.format("%s=%s", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath));
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
@@ -775,7 +773,7 @@ public void testLogicalTypes() throws Exception {
cfg.configs.add(String.format("%s=%s", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"));
cfg.configs.add("hoodie.datasource.write.row.writer.enable=false");
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath));
assertRecordCount(1450, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
@@ -819,7 +817,7 @@ void testLogicalTypesWithJsonSource(boolean hasTransformer) throws Exception {
prepareJsonKafkaDFSFilesWithSchema(
1000, true, topicName, schemaStr);
HoodieDeltaStreamer.Config cfg = getConfigForLogicalTypesWithJsonSource(tableBasePath, WriteOperationType.INSERT, hasTransformer);
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
// Validate.
assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath));
assertRecordCount(1000, tableBasePath, sqlContext);
@@ -835,7 +833,7 @@ void testLogicalTypesWithJsonSource(boolean hasTransformer) throws Exception {
prepareJsonKafkaDFSFilesWithSchema(
1000, false, topicName, schemaStr);
cfg = getConfigForLogicalTypesWithJsonSource(tableBasePath, WriteOperationType.UPSERT, hasTransformer);
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
// Validate.
assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath));
assertRecordCount(2000, tableBasePath, sqlContext);
@@ -905,7 +903,7 @@ public void testBackwardsCompatibility(HoodieTableVersion version) throws Except
cfg.forceDisableCompaction = true;
cfg.sourceLimit = 100_000;
cfg.ignoreCheckpoint = "12345";
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
logicalAssertions(tableSchema, tableBasePath, hudiOpts, version.versionCode());
});
}
@@ -1044,8 +1042,7 @@ void testCOWLogicalRepair(String tableVersion, String recordType, String operati
Option propt = Option.of(properties);
- new HoodieStreamer(prepCfgForCowLogicalRepair(tableBasePath, "456"), jsc, propt).sync();
-
+ syncOnce(prepCfgForCowLogicalRepair(tableBasePath, "456"), propt);
inputDataPath = getClass().getClassLoader().getResource("logical-repair/cow_write_updates/3").toURI().toString();
propt.get().setProperty("hoodie.streamer.source.dfs.root", inputDataPath);
@@ -1055,7 +1052,7 @@ void testCOWLogicalRepair(String tableVersion, String recordType, String operati
propt.get().setProperty("hoodie.clustering.plan.strategy.single.group.clustering.enabled", "true");
propt.get().setProperty("hoodie.clustering.plan.strategy.sort.columns", "ts_millis,_row_key");
}
- new HoodieStreamer(prepCfgForCowLogicalRepair(tableBasePath, "789"), jsc, propt).sync();
+ syncOnce(prepCfgForCowLogicalRepair(tableBasePath, "789"), propt);
String prevTimezone = sparkSession.conf().get("spark.sql.session.timeZone");
try {
@@ -1165,7 +1162,7 @@ void testMORLogicalRepair(String tableVersion, String recordType, String operati
Option propt = Option.of(properties);
- new HoodieStreamer(prepCfgForMorLogicalRepair(tableBasePath, dirName, "123", disableCompaction), jsc, propt).sync();
+ syncOnce(prepCfgForMorLogicalRepair(tableBasePath, dirName, "123", disableCompaction), propt);
String prevTimezone = sparkSession.conf().get("spark.sql.session.timeZone");
try {
@@ -1375,8 +1372,7 @@ public void testUpsertsCOW_ContinuousModeDisabled() throws Exception {
cfg.configs.add(String.format("%s=%s", HoodieMetricsConfig.TURN_METRICS_ON.key(), "true"));
cfg.configs.add(String.format("%s=%s", HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.INMEMORY.name()));
cfg.continuousMode = false;
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- ds.sync();
+ syncOnce(cfg);
assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath));
assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be shutdown");
@@ -1405,8 +1401,7 @@ public void testUpsertsMOR_ContinuousModeDisabled() throws Exception {
cfg.configs.add(String.format("%s=%s", HoodieMetricsConfig.TURN_METRICS_ON.key(), "true"));
cfg.configs.add(String.format("%s=%s", HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.INMEMORY.name()));
cfg.continuousMode = false;
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- ds.sync();
+ syncOnce(cfg);
assertUseV2Checkpoint(HoodieTestUtils.createMetaClient(storage, tableBasePath));
assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be shutdown");
@@ -1545,8 +1540,7 @@ public void testDeltaSyncWithPendingClustering() throws Exception {
cfg.continuousMode = false;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false"));
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- ds.sync();
+ syncOnce(cfg);
// assert ingest successful
TestHelpers.assertAtLeastNCommits(1, tableBasePath);
@@ -1561,8 +1555,7 @@ public void testDeltaSyncWithPendingClustering() throws Exception {
// do another ingestion with inline clustering enabled
cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "true", "2", "", ""));
cfg.retryLastPendingInlineClusteringJob = true;
- HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
- ds2.sync();
+ syncOnce(cfg);
String completeClusteringTimeStamp = meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get().requestedTime();
assertEquals(clusteringRequest.requestedTime(), completeClusteringTimeStamp);
TestHelpers.assertAtLeastNCommits(2, tableBasePath);
@@ -1595,6 +1588,7 @@ public void testDeltaSyncWithPendingCompaction() throws Exception {
deltaStreamer.sync();
TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath);
TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath);
+ deltaStreamer.shutdownGracefully();
// delete compaction commit
HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient(storage, tableBasePath);
@@ -1611,6 +1605,7 @@ public void testDeltaSyncWithPendingCompaction() throws Exception {
meta = HoodieTestUtils.createMetaClient(storage, tableBasePath);
timeline = meta.getActiveTimeline().getRollbackTimeline();
assertEquals(1, timeline.getInstants().size());
+ deltaStreamer.shutdownGracefully();
}
@ParameterizedTest
@@ -1649,10 +1644,10 @@ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws
replacedTimeline.readReplaceCommitMetadata(firstReplaceHoodieInstant.get());
Map> partitionToReplaceFileIds = firstReplaceMetadata.getPartitionToReplaceFileIds();
String partitionName = null;
- List replacedFileIDs = null;
- for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) {
+ List replacedFileIDs = null;
+ for (Map.Entry> entry : partitionToReplaceFileIds.entrySet()) {
partitionName = String.valueOf(entry.getKey());
- replacedFileIDs = (List) entry.getValue();
+ replacedFileIDs = entry.getValue();
}
assertNotNull(partitionName);
@@ -1696,8 +1691,7 @@ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws
// timeline as of now. no cleaner and archival kicked in.
// c1, c2, rc3, c4, c5, rc6,
- ds = new HoodieDeltaStreamer(cfg, jsc);
- ds.sync();
+ syncOnce(cfg);
// after 1 round of sync, timeline will be as follows
// just before clean
// c1, c2, rc3, c4, c5, rc6, c7
@@ -1740,9 +1734,7 @@ public void testReleaseResources(boolean testFailureCase) throws Exception {
cfg.continuousMode = false;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false"));
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- ds.sync();
- ds.shutdownGracefully();
+ syncOnce(cfg);
// assert ingest successful
TestHelpers.assertAtLeastNCommits(1, tableBasePath);
@@ -2151,8 +2143,7 @@ public void testAsyncClusteringJobWithRetry(boolean retryLastFailedClusteringJob
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "false", "0", "false", "0"));
cfg.configs.addAll(getAllMultiWriterConfigs());
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- ds.sync();
+ syncOnce(cfg);
// assert ingest successful
TestHelpers.assertAtLeastNCommits(1, tableBasePath);
@@ -2162,8 +2153,7 @@ public void testAsyncClusteringJobWithRetry(boolean retryLastFailedClusteringJob
schedule.cluster(0);
// do another ingestion
- HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
- ds2.sync();
+ syncOnce(cfg);
// convert clustering request into inflight, Simulate the last clustering failed scenario
HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient(storage, tableBasePath);
@@ -2282,18 +2272,16 @@ private void testBulkInsertRowWriterMultiBatches(Boolean useSchemaProvider, List
useSchemaProvider, 100000, false, null, null, "timestamp", null, false, hoodieTableVersion);
cfg.configs.add(DataSourceWriteOptions.ENABLE_ROW_WRITER().key() + "=true");
cfg.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" + hoodieTableVersion.versionCode());
- HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
- deltaStreamer.sync();
+ syncOnce(cfg);
assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
- deltaStreamer.shutdownGracefully();
+ HoodieStreamer deltaStreamer = null;
try {
if (testEmptyBatch) {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT, false, "partition_path", "0");
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
- deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
- deltaStreamer.sync();
+ syncOnce(cfg);
// since we mimic'ed empty batch, total records should be same as first sync().
assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
HoodieTableMetaClient metaClient = createMetaClient(jsc, tableBasePath);
@@ -2305,7 +2293,6 @@ private void testBulkInsertRowWriterMultiBatches(Boolean useSchemaProvider, List
compareLatestTwoSchemas(metaClient);
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT, false, "partition_path", "");
- deltaStreamer.shutdownGracefully();
}
int recordsSoFar = 100;
@@ -2356,7 +2343,7 @@ public void testBulkInsertRowWriterContinuousModeWithInlineClusteringAmbiguousDa
"false", ""), true);
}
- private void testBulkInsertRowWriterContinuousMode(Boolean useSchemaProvider, List transformerClassNames,
+ private void testBulkInsertRowWriterContinuousMode(boolean useSchemaProvider, List transformerClassNames,
boolean testEmptyBatch, List customConfigs, boolean makeDatesAmbiguous) throws Exception {
PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
int parquetRecordsCount = 100;
@@ -2371,13 +2358,13 @@ private void testBulkInsertRowWriterContinuousMode(Boolean useSchemaProvider, Li
try {
int counter = 2;
while (counter < 100) { // lets keep going. if the test times out, we will cancel the future within finally. So, safe to generate 100 batches.
- LOG.info("Generating data for batch " + counter);
+ LOG.info("Generating data for batch {}", counter);
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, Integer.toString(counter) + ".parquet", false, null, null, makeDatesAmbiguous);
counter++;
Thread.sleep(2000);
}
} catch (Exception ex) {
- LOG.warn("Input data generation failed", ex.getMessage());
+ LOG.warn("Input data generation failed", ex);
throw new RuntimeException(ex.getMessage(), ex);
}
});
@@ -2440,7 +2427,7 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, WriteOperationType.BULK_INSERT,
true, null);
addRecordMerger(recordType, downstreamCfg.configs);
- new HoodieDeltaStreamer(downstreamCfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ syncOnce(new HoodieDeltaStreamer(downstreamCfg, jsc, fs, hiveServer.getHiveConf()));
assertRecordCount(1000, downstreamTableBasePath, sqlContext);
assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
assertDistanceCountWithExactValue(1000, downstreamTableBasePath, sqlContext);
@@ -2448,7 +2435,7 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t
// No new data => no commits for upstream table
cfg.sourceLimit = 0;
- new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ syncOnce(new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()));
assertRecordCount(1000, tableBasePath, sqlContext);
assertDistanceCount(1000, tableBasePath, sqlContext);
assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext);
@@ -2458,7 +2445,7 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t
HoodieDeltaStreamer.Config downstreamCfg1 =
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath,
WriteOperationType.BULK_INSERT, true, DummySchemaProvider.class.getName());
- new HoodieDeltaStreamer(downstreamCfg1, jsc).sync();
+ syncOnce(downstreamCfg1);
assertRecordCount(1000, downstreamTableBasePath, sqlContext);
assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
assertDistanceCountWithExactValue(1000, downstreamTableBasePath, sqlContext);
@@ -2467,7 +2454,7 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t
// upsert() #1 on upstream hudi table
cfg.sourceLimit = 2000;
cfg.operation = WriteOperationType.UPSERT;
- new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ syncOnce(new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()));
assertRecordCount(1950, tableBasePath, sqlContext);
assertDistanceCount(1950, tableBasePath, sqlContext);
assertDistanceCountWithExactValue(1950, tableBasePath, sqlContext);
@@ -2481,7 +2468,7 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t
false, null);
addRecordMerger(recordType, downstreamCfg.configs);
downstreamCfg.sourceLimit = 2000;
- new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
+ syncOnce(downstreamCfg);
assertRecordCount(2000, downstreamTableBasePath, sqlContext);
assertDistanceCount(2000, downstreamTableBasePath, sqlContext);
assertDistanceCountWithExactValue(2000, downstreamTableBasePath, sqlContext);
@@ -2499,16 +2486,17 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t
.setBasePath(tableBasePath)
.setLoadActiveTimelineOnLoad(true)
.build();
- HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig, metaClient);
- final String tableName = hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME);
- assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist");
- assertEquals(3, hiveClient.getAllPartitions(tableName).size(),
- "Table partitions should match the number of partitions we wrote");
- assertEquals(lastInstantForUpstreamTable.requestedTime(),
- hiveClient.getLastCommitTimeSynced(tableName).get(),
- "The last commit that was synced should be updated in the TBLPROPERTIES");
- UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
- UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, downstreamTableBasePath);
+ try (HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig, metaClient)) {
+ final String tableName = hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME);
+ assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist");
+ assertEquals(3, hiveClient.getAllPartitions(tableName).size(),
+ "Table partitions should match the number of partitions we wrote");
+ assertEquals(lastInstantForUpstreamTable.requestedTime(),
+ hiveClient.getLastCommitTimeSynced(tableName).get(),
+ "The last commit that was synced should be updated in the TBLPROPERTIES");
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, downstreamTableBasePath);
+ }
}
@Test
@@ -2518,7 +2506,7 @@ public void testNullSchemaProvider() {
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
false, false, null, null);
Exception e = assertThrows(HoodieException.class, () -> {
- new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ syncOnce(new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()));
}, "Should error out when schema provider is not provided");
LOG.debug("Expected error during reading data from source ", e);
assertTrue(e.getMessage().contains("Schema provider is required for this operation and for the source of interest. "
@@ -2534,7 +2522,7 @@ public void testPayloadClassUpdate() throws Exception {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false,
true, false, null, "MERGE_ON_READ");
- new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ syncOnce(new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()));
assertRecordCount(1000, dataSetBasePath, sqlContext);
HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, dataSetBasePath, false);
assertEquals(metaClient.getTableConfig().getPayloadClass(), DefaultHoodieRecordPayload.class.getName());
@@ -2556,7 +2544,7 @@ public void testPartialPayloadClass() throws Exception {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false,
true, true, PartialUpdateAvroPayload.class.getName(), "MERGE_ON_READ");
- new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ syncOnce(new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()));
assertRecordCount(1000, dataSetBasePath, sqlContext);
//now assert that hoodie.properties file now has updated payload class name
@@ -2571,7 +2559,7 @@ public void testPayloadClassUpdateWithCOWTable() throws Exception {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false,
true, false, null, null);
- new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ syncOnce(new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()));
assertRecordCount(1000, dataSetBasePath, sqlContext);
Properties props = new Properties();
@@ -2624,7 +2612,7 @@ public void testFilterDupesWithPrecombine(
cfg.filterDupes = true;
cfg.sourceOrderingFields = sourceOrderingField;
addRecordMerger(recordType, cfg.configs);
- new HoodieStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
@@ -2652,7 +2640,7 @@ public void testDeltaStreamerWithMultipleOrderingFields(HoodieTableType tableTyp
cfg.recordMergeStrategyId = HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
TestDataSource.recordInstantTime = Option.of("002");
- new HoodieStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
@@ -2698,7 +2686,7 @@ public void testDeltaStreamerFailureWithChangingOrderingFields(HoodieTableType t
cfg.recordMergeStrategyId = HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
TestDataSource.recordInstantTime = Option.of("001");
- new HoodieStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
@@ -2708,7 +2696,7 @@ public void testDeltaStreamerFailureWithChangingOrderingFields(HoodieTableType t
TestDataSource.recordInstantTime = Option.of("002");
runStreamSync(cfg, false, 10, WriteOperationType.UPSERT);
});
- assertTrue(e.getMessage().equals("Configured ordering fields: timestamp do not match table ordering fields: [timestamp, rider]"));
+ assertEquals("Configured ordering fields: timestamp do not match table ordering fields: [timestamp, rider]", e.getMessage());
}
private static long getNumUpdates(HoodieCommitMetadata metadata) {
@@ -2724,7 +2712,7 @@ public void testFilterDupes() throws Exception {
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
@@ -2759,7 +2747,7 @@ public void testFilterDupes() throws Exception {
cfg2.filterDupes = true;
cfg2.operation = WriteOperationType.UPSERT;
try {
- new HoodieDeltaStreamer(cfg2, jsc).sync();
+ syncOnce(cfg2);
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
}
@@ -2771,7 +2759,7 @@ private void runStreamSync(
cfg.filterDupes = filterDupes;
cfg.sourceLimit = numberOfRecords;
cfg.operation = operationType;
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
}
@ParameterizedTest
@@ -2919,12 +2907,11 @@ private void testORCDFSSource(boolean useSchemaProvider, List transforme
UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, storage, basePath + "/" + PROPS_FILENAME_TEST_ORC);
String tableBasePath = basePath + "/test_orc_source_table" + testNum;
- HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ syncOnce(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ORCDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_ORC, false,
- useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
- deltaStreamer.sync();
+ useSchemaProvider, 100000, false, null, null, "timestamp", null));
assertRecordCount(ORC_NUM_RECORDS, tableBasePath, sqlContext);
testNum++;
}
@@ -2999,6 +2986,7 @@ private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean autoRes
deltaStreamer.sync();
assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
testNum++;
+ deltaStreamer.shutdownGracefully();
}
@Test
@@ -3020,6 +3008,7 @@ public void testJsonKafkaDFSSource() throws Exception {
prepareJsonKafkaDFSFiles(records, false, topicName);
deltaStreamer.sync();
assertRecordCount(totalRecords, tableBasePath, sqlContext);
+ deltaStreamer.shutdownGracefully();
}
@Test
@@ -3032,11 +3021,10 @@ public void testJsonKafkaDFSSourceWithOffsets() throws Exception {
prepareJsonKafkaDFSFiles(numRecords, true, topicName, numPartitions);
prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName, null, true);
String tableBasePath = basePath + "/test_json_kafka_offsets_table" + testNum;
- HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ syncOnce(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
- true, 100000, false, null, null, "timestamp", null), jsc);
- deltaStreamer.sync();
+ true, 100000, false, null, null, "timestamp", null));
sqlContext.clearCache();
Dataset ds = sqlContext.read().format("org.apache.hudi").load(tableBasePath);
assertEquals(numRecords, ds.count());
@@ -3085,6 +3073,7 @@ public void testKafkaTimestampType() throws Exception {
"timestamp", String.valueOf(System.currentTimeMillis())), jsc);
deltaStreamer.sync();
assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, sqlContext);
+ deltaStreamer.shutdownGracefully();
}
@Disabled("HUDI-6609")
@@ -3229,17 +3218,13 @@ public void testEmptyBatchWithNullSchemaFirstBatch() throws Exception {
config.schemaProviderClassName = NullValueSchemaProvider.class.getName();
config.sourceClassName = TestParquetDFSSourceEmptyBatch.class.getName();
- HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(config, jsc);
- deltaStreamer1.sync();
- deltaStreamer1.shutdownGracefully();
+ syncOnce(config);
assertRecordCount(0, tableBasePath, sqlContext);
config.schemaProviderClassName = null;
config.sourceClassName = ParquetDFSSource.class.getName();
prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
- HoodieDeltaStreamer deltaStreamer2 = new HoodieDeltaStreamer(config, jsc);
- deltaStreamer2.sync();
- deltaStreamer2.shutdownGracefully();
+ syncOnce(config);
//since first batch has empty schema, only records from the second batch should be written
assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
}
@@ -3264,11 +3249,10 @@ private void testDeltaStreamerRestartAfterMissingHoodieProps(boolean testInitFai
PARQUET_SOURCE_ROOT, false, "partition_path", "0");
String tableBasePath = basePath + "/test_parquet_table" + testNum;
- HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ syncOnce(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testInitFailure ? TestParquetDFSSourceEmptyBatch.class.getName() : ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
- useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
- deltaStreamer.sync();
+ useSchemaProvider, 100000, false, null, null, "timestamp", null));
if (testInitFailure) {
FileStatus[] fileStatuses = fs.listStatus(new Path(tableBasePath + "/.hoodie/timeline/"));
@@ -3285,17 +3269,16 @@ private void testDeltaStreamerRestartAfterMissingHoodieProps(boolean testInitFai
// restart the pipeline.
if (testInitFailure) { // should succeed.
- deltaStreamer = new HoodieDeltaStreamer(
+ syncOnce(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
- useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
- deltaStreamer.sync();
+ useSchemaProvider, 100000, false, null, null, "timestamp", null));
assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
} else {
- assertThrows(HoodieIOException.class, () -> new HoodieDeltaStreamer(
+ assertThrows(HoodieIOException.class, () -> syncOnce(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
- useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc));
+ useSchemaProvider, 100000, false, null, null, "timestamp", null)));
}
testNum++;
}
@@ -3373,12 +3356,10 @@ private void testCsvDFSSource(
prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassNames != null);
String tableBasePath = basePath + "/test_csv_table" + testNum;
String sourceOrderingField = (hasHeader || useSchemaProvider) ? "timestamp" : "_c0";
- HoodieDeltaStreamer deltaStreamer =
- new HoodieDeltaStreamer(TestHelpers.makeConfig(
+ syncOnce(TestHelpers.makeConfig(
tableBasePath, WriteOperationType.INSERT, CsvDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_CSV, false,
- useSchemaProvider, 1000, false, null, null, sourceOrderingField, null), jsc);
- deltaStreamer.sync();
+ useSchemaProvider, 1000, false, null, null, sourceOrderingField, null));
assertRecordCount(CSV_NUM_RECORDS, tableBasePath, sqlContext);
testNum++;
}
@@ -3513,6 +3494,7 @@ public void testSqlSourceSource() throws Exception {
deltaStreamer.sync();
assertRecordCount(SQL_SOURCE_NUM_RECORDS * 2, tableBasePath, sqlContext);
+ deltaStreamer.shutdownGracefully();
}
@Test
@@ -3563,7 +3545,7 @@ public void testHoodieIncrFallback() throws Exception {
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath,
WriteOperationType.BULK_INSERT, true, null);
downstreamCfg.configs.add("hoodie.streamer.source.hoodieincr.num_instants=1");
- new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
+ syncOnce(downstreamCfg);
insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
assertRecordCount(1000, downstreamTableBasePath, sqlContext);
@@ -3578,8 +3560,8 @@ public void testHoodieIncrFallback() throws Exception {
//Adding this conf to make testing easier :)
downstreamCfg.configs.add("hoodie.streamer.source.hoodieincr.num_instants=10");
downstreamCfg.operation = WriteOperationType.UPSERT;
- new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
- new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
+ syncOnce(downstreamCfg);
+ syncOnce(downstreamCfg);
long baseTableRecords = sqlContext.read().format("org.apache.hudi").load(tableBasePath).count();
long downStreamTableRecords = sqlContext.read().format("org.apache.hudi").load(downstreamTableBasePath).count();
@@ -3598,7 +3580,7 @@ private void insertInTable(String tableBasePath, int count, WriteOperationType o
cfg.configs.add("hoodie.test.source.generate.inserts=true");
for (int i = 0; i < count; i++) {
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
}
}
@@ -3645,6 +3627,7 @@ public void testDeletePartitions() throws Exception {
// There should not be any fileIDs in the deleted partition
assertTrue(getAllFileIDsInTable(tableBasePath, Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).isEmpty());
+ deltaStreamer.shutdownGracefully();
}
@Test
@@ -3680,7 +3663,7 @@ void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOp
cfg.operation = operationType;
// No new data => no commits.
cfg.sourceLimit = 0;
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
if (operationType == WriteOperationType.INSERT_OVERWRITE) {
assertRecordCount(1000, tableBasePath, sqlContext);
@@ -3698,7 +3681,7 @@ void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOp
}
cfg.sourceLimit = 1000;
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
assertRecordCount(950, tableBasePath, sqlContext);
assertDistanceCount(950, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
@@ -3747,8 +3730,7 @@ public void testDropPartitionColumns(HoodieRecordType recordType) throws Excepti
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
addRecordMerger(recordType, cfg.configs);
cfg.configs.add(String.format("%s=%s", HoodieTableConfig.DROP_PARTITION_COLUMNS.key(), "true"));
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- ds.sync();
+ syncOnce(cfg);
// assert ingest successful
TestHelpers.assertAtLeastNCommits(1, tableBasePath);
@@ -3774,7 +3756,7 @@ public void testForceEmptyMetaSync() throws Exception {
cfg.enableMetaSync = true;
cfg.forceEmptyMetaSync = true;
- new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ syncOnce(new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()));
assertRecordCount(0, tableBasePath, sqlContext);
// make sure hive table is present
@@ -3785,9 +3767,10 @@ public void testForceEmptyMetaSync() throws Exception {
.setBasePath(tableBasePath)
.setLoadActiveTimelineOnLoad(true)
.build();
- HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig, metaClient);
- final String tableName = hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME);
- assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist");
+ try (HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig, metaClient)) {
+ final String tableName = hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME);
+ assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist");
+ }
}
@Test
@@ -3795,7 +3778,7 @@ public void testResumeCheckpointAfterChangingCOW2MOR() throws Exception {
String tableBasePath = basePath + "/test_resume_checkpoint_after_changing_cow_to_mor";
// default table type is COW
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
TestHelpers.assertAtLeastNCommits(1, tableBasePath);
@@ -3817,7 +3800,7 @@ public void testResumeCheckpointAfterChangingCOW2MOR() throws Exception {
// continue deltastreamer
cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes.
assertRecordCount(1450, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
@@ -3828,7 +3811,7 @@ public void testResumeCheckpointAfterChangingCOW2MOR() throws Exception {
TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath);
// test the table type is already mor
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes.
// total records should be 1900 now
assertRecordCount(1900, tableBasePath, sqlContext);
@@ -3849,7 +3832,7 @@ public void testResumeCheckpointAfterChangingMOR2COW() throws Exception {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
// change table type to MOR
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
TestHelpers.assertAtLeastNCommits(1, tableBasePath);
@@ -3859,7 +3842,7 @@ public void testResumeCheckpointAfterChangingMOR2COW() throws Exception {
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
cfg.configs.add("hoodie.compaction.strategy=org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy");
cfg.configs.add("hoodie.compact.inline.max.delta.commits=1");
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes.
assertRecordCount(1450, tableBasePath, sqlContext);
// totalCommits: 1 deltacommit(bulk_insert) + 1 deltacommit(upsert) + 1 commit(compaction)
@@ -3888,7 +3871,7 @@ public void testResumeCheckpointAfterChangingMOR2COW() throws Exception {
// continue deltastreamer
cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes.
assertRecordCount(1900, tableBasePath, sqlContext);
// the checkpoint now should be 00002
@@ -3898,7 +3881,7 @@ public void testResumeCheckpointAfterChangingMOR2COW() throws Exception {
TestHelpers.assertAtLeastNCommits(4, tableBasePath);
// test the table type is already cow
- new HoodieDeltaStreamer(cfg, jsc).sync();
+ syncOnce(cfg);
// out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes.
// total records should be 2350 now
assertRecordCount(2350, tableBasePath, sqlContext);
@@ -3937,6 +3920,7 @@ public void testAutoGenerateRecordKeys() throws Exception {
deltaStreamer.sync();
assertRecordCount(parquetRecordsCount + 200, tableBasePath, sqlContext);
testNum++;
+ deltaStreamer.shutdownGracefully();
}
@ParameterizedTest
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
index 452c8e09a2299..2b77665ad9d5f 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
@@ -95,11 +95,13 @@ private void runDeltaStreamer(WriteOperationType operationType, boolean shouldGe
deltaStreamer.sync();
assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
testNum++;
+ deltaStreamer.shutdownGracefully();
if (shouldGenerateUpdates) {
prepareParquetDFSUpdates(parquetRecordsCount, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null, dataGenerator, "001");
HoodieDeltaStreamer updateDs = new HoodieDeltaStreamer(config, jsc);
updateDs.sync();
+ updateDs.shutdownGracefully();
}
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index 60519d333de22..f4b787e042eca 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -136,6 +136,9 @@ public void setupTest() {
public void teardown() throws Exception {
super.teardown();
TestSchemaProvider.resetTargetSchema();
+ if (deltaStreamer != null) {
+ deltaStreamer.shutdownGracefully();
+ }
}
@AfterAll
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index b11e21d4bbe31..828f0936f2f74 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -247,8 +247,9 @@ public void testBase(String tableType,
assertRecordCount(numRecords);
df = sparkSession.read().format("hudi").load(tableBasePath);
- df.show(100,false);
df.cache();
+ // assert data can be read
+ df.limit(100).collect();
assertDataType(df, "tip_history", DataTypes.createArrayType(DataTypes.LongType));
assertDataType(df, "fare", DataTypes.createStructType(new StructField[]{
new StructField("amount", DataTypes.StringType, true, Metadata.empty()),
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java
index 494149cc5ef84..befa89ca8a715 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java
@@ -81,6 +81,7 @@ public void testMultipleTransformersWithIdentifiers() throws Exception {
assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
assertEquals(0, sqlContext.read().format("org.apache.hudi").load(tableBasePath).where("timestamp != 110").count());
testNum++;
+ deltaStreamer.shutdownGracefully();
}
/**
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
index 8d701da40ad5f..d1e0f70b78f8c 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/multisync/TestMultipleMetaSync.java
@@ -49,7 +49,7 @@ void testMultipleMetaStore() throws Exception {
MockSyncTool2.syncSuccess = false;
// Initial bulk insert to ingest to first hudi table
HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath, getSyncNames("MockSyncTool1", "MockSyncTool2"));
- new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+ syncOnce(new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()));
assertTrue(MockSyncTool1.syncSuccess);
assertTrue(MockSyncTool2.syncSuccess);
}
@@ -61,7 +61,7 @@ void testWithException(String syncClassNames) {
MockSyncTool1.syncSuccess = false;
MockSyncTool2.syncSuccess = false;
HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath, syncClassNames);
- Exception e = assertThrows(HoodieMetaSyncException.class, () -> new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync());
+ Exception e = assertThrows(HoodieMetaSyncException.class, () -> syncOnce(new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf())));
assertTrue(e.getMessage().contains(MockSyncToolException1.class.getName()));
assertTrue(MockSyncTool1.syncSuccess);
assertTrue(MockSyncTool2.syncSuccess);
@@ -73,7 +73,7 @@ void testMultipleExceptions() {
MockSyncTool1.syncSuccess = false;
MockSyncTool2.syncSuccess = false;
HoodieDeltaStreamer.Config cfg = getConfig(tableBasePath, getSyncNames("MockSyncTool1", "MockSyncTool2", "MockSyncToolException1", "MockSyncToolException2"));
- Exception e = assertThrows(HoodieMetaSyncException.class, () -> new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync());
+ Exception e = assertThrows(HoodieMetaSyncException.class, () -> syncOnce(new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf())));
assertTrue(e.getMessage().contains(MockSyncToolException1.class.getName()));
assertTrue(e.getMessage().contains(MockSyncToolException2.class.getName()));
assertTrue(MockSyncTool1.syncSuccess);
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index b9017c59ad2f7..243ad570ad75c 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -31,6 +31,7 @@
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.streamer.SourceProfile;
import org.apache.hudi.utilities.streamer.SourceProfileSupplier;
+import org.apache.hudi.utilities.testutils.KafkaTestUtils;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -40,9 +41,9 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.streaming.kafka010.KafkaTestUtils;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -71,19 +72,24 @@ public abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarne
protected final Option sourceProfile = Option.of(mock(SourceProfileSupplier.class));
protected SchemaProvider schemaProvider;
- protected KafkaTestUtils testUtils;
+ protected static KafkaTestUtils testUtils;
- @BeforeEach
- public void initClass() {
+ @BeforeAll
+ public static void setupKafka() {
testUtils = new KafkaTestUtils();
testUtils.setup();
}
- @AfterEach
- public void cleanupClass() {
+ @AfterAll
+ public static void teardownKafka() {
testUtils.teardown();
}
+ @AfterEach
+ void cleanupTopics() {
+ testUtils.deleteTopics();
+ }
+
protected abstract TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy);
protected abstract SourceFormatAdapter createSource(TypedProperties props);
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index c3c938e452c8e..dd02193d20a80 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -34,6 +34,7 @@
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
+import org.apache.hudi.utilities.testutils.KafkaTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import com.google.crypto.tink.subtle.Base64;
@@ -49,10 +50,9 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.streaming.kafka010.KafkaTestUtils;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -87,24 +87,25 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
protected SchemaProvider schemaProvider;
- protected KafkaTestUtils testUtils;
+ protected static KafkaTestUtils testUtils;
@BeforeAll
public static void initClass() {
dataGen = new HoodieTestDataGenerator(0xDEED);
- }
-
- @BeforeEach
- public void setup() {
testUtils = new KafkaTestUtils();
testUtils.setup();
}
- @AfterEach
- public void tearDown() {
+ @AfterAll
+ public static void tearDown() {
testUtils.teardown();
}
+ @AfterEach
+ void cleanupTopics() {
+ testUtils.deleteTopics();
+ }
+
protected TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.streamer.source.kafka.topic", topic);
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
index d67808dd1e048..9095d835eb6f4 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
@@ -89,7 +89,7 @@ public void testCorruptedSourceFile() throws IOException {
corruptFile(file1Status.getPath());
assertTrue(batch.getBatch().isPresent());
Throwable t = assertThrows(Exception.class,
- () -> batch.getBatch().get().show(30));
+ () -> batch.getBatch().get().limit(30).collect());
while (t != null) {
if (t instanceof SchemaCompatibilityException) {
return;
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
index afc441c58d638..1f22f6712be19 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
@@ -33,12 +33,12 @@
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
import org.apache.hudi.utilities.sources.processor.maxwell.MaxwellJsonKafkaSourcePostProcessor;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
+import org.apache.hudi.utilities.testutils.KafkaTestUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
index 1865c8ffa6060..cd8e052d8aca4 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
@@ -28,6 +28,7 @@
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
+import org.apache.hudi.utilities.testutils.KafkaTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.avro.Schema;
@@ -38,12 +39,10 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.streaming.kafka010.KafkaTestUtils;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@@ -62,28 +61,24 @@ public abstract class TestAbstractDebeziumSource extends UtilitiesTestBase {
private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
private final HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class);
- private KafkaTestUtils testUtils;
+ private static KafkaTestUtils testUtils;
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices();
- }
-
- @BeforeEach
- public void setUpKafkaTestUtils() {
testUtils = new KafkaTestUtils();
testUtils.setup();
}
- @AfterEach
- public void tearDownKafkaTestUtils() {
- testUtils.teardown();
- testUtils = null;
- }
-
@AfterAll
public static void cleanupClass() throws IOException {
UtilitiesTestBase.cleanUpUtilitiesTestServices();
+ testUtils.teardown();
+ }
+
+ @AfterEach
+ void cleanupTopics() {
+ testUtils.deleteTopics();
}
private TypedProperties createPropsForJsonSource() {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
index 33e89c9684d3e..e630d62139747 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -21,13 +21,14 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.LogicalClock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
+import org.apache.hudi.utilities.testutils.KafkaTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers;
-import org.apache.hudi.common.util.LogicalClock;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
@@ -42,10 +43,10 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.apache.spark.streaming.kafka010.OffsetRange;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -88,20 +89,25 @@
public class TestKafkaOffsetGen {
private final String testTopicName = "hoodie_test_" + UUID.randomUUID();
- private HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class);
- private KafkaTestUtils testUtils;
+ private final HoodieIngestionMetrics metrics = mock(HoodieIngestionMetrics.class);
+ private static KafkaTestUtils testUtils;
- @BeforeEach
- public void setup() throws Exception {
+ @BeforeAll
+ public static void setup() throws Exception {
testUtils = new KafkaTestUtils();
testUtils.setup();
}
- @AfterEach
- public void teardown() throws Exception {
+ @AfterAll
+ public static void teardown() throws Exception {
testUtils.teardown();
}
+ @AfterEach
+ void cleanupTopics() {
+ testUtils.deleteTopics();
+ }
+
private TypedProperties getConsumerConfigs(String autoOffsetReset, String kafkaCheckpointType) {
TypedProperties props = new TypedProperties();
props.put("hoodie.streamer.source.kafka.checkpoint.type", kafkaCheckpointType);
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/KafkaTestUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/KafkaTestUtils.java
new file mode 100644
index 0000000000000..d99188e9ec2b5
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/KafkaTestUtils.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.testutils;
+
+import org.apache.hudi.common.util.StringUtils;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.kafka.ConfluentKafkaContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+public class KafkaTestUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaTestUtils.class);
+ private static final DockerImageName KAFKA_IMAGE = DockerImageName.parse("confluentinc/cp-kafka:7.7.1");
+ private final List createdTopics = new ArrayList<>();
+ private ConfluentKafkaContainer kafkaContainer;
+
+ public KafkaTestUtils setup() {
+ kafkaContainer = new ConfluentKafkaContainer(KAFKA_IMAGE);
+ kafkaContainer.start();
+ return this;
+ }
+
+ public String brokerAddress() {
+ if (kafkaContainer == null || !kafkaContainer.isRunning()) {
+ throw new IllegalStateException("Kafka container is not running. Please start the container first.");
+ }
+ return kafkaContainer.getBootstrapServers();
+ }
+
+ public void createTopic(String topic) {
+ createTopic(topic, 1);
+ }
+
+ public void createTopic(String topic, int numPartitions) throws TopicExistsException {
+ createTopic(topic, numPartitions, null);
+ }
+
+ public void createTopic(String topic, int numPartitions, Properties properties) throws TopicExistsException {
+ Properties adminProps = getAdminProps();
+
+ try (AdminClient adminClient = AdminClient.create(adminProps)) {
+ createdTopics.add(topic);
+ NewTopic newTopic = new NewTopic(topic, numPartitions, (short) 1);
+ adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+ if (properties != null) {
+ adminClient.alterConfigs(
+ Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, topic),
+ new Config(
+ properties.entrySet().stream()
+ .map(e -> new ConfigEntry(
+ e.getKey().toString(), e.getValue().toString()))
+ .collect(Collectors.toList())))
+ ).all().get();
+ }
+ } catch (Exception e) {
+ if (e.getCause() instanceof TopicExistsException) {
+ throw (TopicExistsException) e.getCause();
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Properties getAdminProps() {
+ Properties adminProps = new Properties();
+ adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
+ return adminProps;
+ }
+
+ public void deleteTopics() {
+ if (createdTopics.isEmpty()) {
+ return;
+ }
+ try (AdminClient adminClient = AdminClient.create(getAdminProps())) {
+ adminClient.deleteTopics(createdTopics).all().get();
+ createdTopics.clear();
+ } catch (Exception e) {
+ LOG.warn("Failed to delete topics: {}", StringUtils.join(createdTopics, ","), e);
+ }
+ }
+
+ public void sendMessages(String topic, String[] messages) {
+ try (KafkaProducer producer = new KafkaProducer<>(getProducerProps())) {
+ for (String message : messages) {
+ producer.send(new ProducerRecord<>(topic, message));
+ }
+ producer.flush();
+ }
+ }
+
+ public void sendMessages(String topic, Tuple2[] keyValuePairs) {
+ try (KafkaProducer producer = new KafkaProducer<>(getProducerProps())) {
+ for (Tuple2 kv : keyValuePairs) {
+ producer.send(new ProducerRecord<>(topic, kv._1, kv._2));
+ }
+ producer.flush();
+ }
+ }
+
+ private Properties getProducerProps() {
+ Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ return producerProps;
+ }
+
+ public void teardown() {
+ if (kafkaContainer != null && kafkaContainer.isRunning()) {
+ kafkaContainer.stop();
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 0e2b49707551a..c65cbd3dba8c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1914,6 +1914,12 @@
${testcontainers.version}
test
+
+ org.testcontainers
+ kafka
+ ${testcontainers.version}
+ test
+