From c17d5d3ebd55a212d4d3306162164edb1bd7a5d9 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 12 Jul 2023 20:03:19 +0800 Subject: [PATCH 01/22] [FLINK-XXXXX] Upgrade Pulsar version to 3.0.0 Signed-off-by: tison --- .../source/enumerator/cursor/stop/MessageIdStopCursor.java | 3 +-- .../testutils/runtime/container/PulsarContainerRuntime.java | 2 +- .../src/main/resources/META-INF/NOTICE | 6 +++--- pom.xml | 2 +- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java index ada2fd32..bea8fa26 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java @@ -26,7 +26,6 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.pulsar.client.api.MessageId.earliest; import static org.apache.pulsar.client.api.MessageId.latest; -import static org.apache.pulsar.client.impl.MessageIdImpl.convertToMessageIdImpl; /** * Stop consuming message at a given message id. We use the {@link MessageId#compareTo(Object)} for @@ -43,7 +42,7 @@ public MessageIdStopCursor(MessageId messageId, boolean inclusive) { checkArgument(!earliest.equals(messageId), "MessageId.earliest is not supported."); checkArgument(!latest.equals(messageId), "Use LatestMessageStopCursor instead."); - this.messageId = convertToMessageIdImpl(messageId); + this.messageId = messageId; this.inclusive = inclusive; } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java index 20c4ae87..097927ea 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java @@ -54,7 +54,7 @@ public class PulsarContainerRuntime implements PulsarRuntime { private static final String PULSAR_ADMIN_URL = String.format("http://%s:%d", PULSAR_INTERNAL_HOSTNAME, BROKER_HTTP_PORT); - private static final String CURRENT_VERSION = "2.11.0"; + private static final String CURRENT_VERSION = "3.0.0"; private final PulsarContainer container; private final AtomicBoolean started; diff --git a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE index 9aa8f428..e15f9c8a 100644 --- a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE +++ b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE @@ -6,9 +6,9 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) -- org.apache.pulsar:pulsar-client-admin-api:2.11.0 -- org.apache.pulsar:pulsar-client-all:2.11.0 -- org.apache.pulsar:pulsar-client-api:2.11.0 +- org.apache.pulsar:pulsar-client-admin-api:3.0.0 +- org.apache.pulsar:pulsar-client-all:3.0.0 +- org.apache.pulsar:pulsar-client-api:3.0.0 This project bundles the following dependencies under the Bouncy Castle license. See bundled license files for details. diff --git a/pom.xml b/pom.xml index 75c30832..201fb01a 100644 --- a/pom.xml +++ b/pom.xml @@ -52,7 +52,7 @@ under the License. 1.17.0 - 2.11.0 + 3.0.0 1.69 1.3.9 From e2cfd50fde002723f3867e3b88a4b6dd2af02703 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 14 Jul 2023 11:05:36 +0800 Subject: [PATCH 02/22] try latest flink-ci-tools Signed-off-by: tison --- pom.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 201fb01a..b67e1e3e 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ under the License. 1.17.0 + 1.18-SNAPSHOT 3.0.0 1.69 @@ -457,7 +458,7 @@ under the License. org.apache.flink flink-ci-tools - ${flink.version} + ${flink-ci-tools.version} From 58485c7010387886a91e9a7bcab8bd370fc1a032 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 14 Jul 2023 13:21:33 +0800 Subject: [PATCH 03/22] compatible with 1.18-SNAPSHOT Signed-off-by: tison --- .../pulsar/sink/writer/PulsarWriterTest.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java index eba7b18f..01914ccf 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java @@ -18,9 +18,11 @@ package org.apache.flink.connector.pulsar.sink.writer; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.connector.sink2.Sink.InitContext; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.configuration.Configuration; @@ -205,6 +207,25 @@ public int getAttemptNumber() { return 0; } + // The following three methods are for compatibility with + // https://github.com/apache/flink/commit/4f5b2fb5736f5a1c098a7dc1d448a879f36f801b + // . Removed the commented out `@Override` when we move to 1.18. + + // @Override + public boolean isObjectReuseEnabled() { + return false; + } + + // @Override + public TypeSerializer createInputSerializer() { + return null; + } + + // @Override + public JobID getJobId() { + return null; + } + @Override public SinkWriterMetricGroup metricGroup() { return metricGroup; From e00a1f25edd410b329b177c872a02e8c66221052 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 14 Jul 2023 14:51:35 +0800 Subject: [PATCH 04/22] try to add missing dep Signed-off-by: tison --- flink-sql-connector-pulsar/pom.xml | 5 +++++ .../src/main/resources/META-INF/NOTICE | 1 + 2 files changed, 6 insertions(+) diff --git a/flink-sql-connector-pulsar/pom.xml b/flink-sql-connector-pulsar/pom.xml index 0c2b2ceb..bdc3809a 100644 --- a/flink-sql-connector-pulsar/pom.xml +++ b/flink-sql-connector-pulsar/pom.xml @@ -44,6 +44,10 @@ under the License. flink-connector-pulsar ${project.version} + + com.fasterxml.jackson.core + jackson-annotations + @@ -61,6 +65,7 @@ under the License. + com.fasterxml.jackson.core:jackson-annotations org.apache.flink:flink-connector-base org.apache.flink:flink-connector-pulsar org.apache.pulsar:pulsar-client-admin-api diff --git a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE index e15f9c8a..08199ce7 100644 --- a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE +++ b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE @@ -6,6 +6,7 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) +- com.fasterxml.jackson.core:jackson-annotations:2.13.4 - org.apache.pulsar:pulsar-client-admin-api:3.0.0 - org.apache.pulsar:pulsar-client-all:3.0.0 - org.apache.pulsar:pulsar-client-api:3.0.0 From c78689c4ea62b17e99e2353f9e05967558e09465 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 16 Jul 2023 12:19:31 +0800 Subject: [PATCH 05/22] try to relocated connector specific classes Signed-off-by: tison --- flink-sql-connector-pulsar/pom.xml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-sql-connector-pulsar/pom.xml b/flink-sql-connector-pulsar/pom.xml index bdc3809a..fea51928 100644 --- a/flink-sql-connector-pulsar/pom.xml +++ b/flink-sql-connector-pulsar/pom.xml @@ -95,11 +95,15 @@ under the License. org.bouncycastle - org.apache.pulsar.shade.org.bouncycastle + org.apache.flink.connector.pulsar.shade.org.bouncycastle com.scurrilous - org.apache.pulsar.shade.com.scurrilous + org.apache.flink.connector.pulsar.shade.com.scurrilous + + + org.apache.pulsar.shade.io.netty + org.apache.flink.connector.pulsar.shade.io.netty From f29bda8e6276adf82ec918defcacbbd6f4c174a8 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 17 Jul 2023 18:28:26 +0800 Subject: [PATCH 06/22] revert shade changes Signed-off-by: tison --- flink-sql-connector-pulsar/pom.xml | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/flink-sql-connector-pulsar/pom.xml b/flink-sql-connector-pulsar/pom.xml index fea51928..bdc3809a 100644 --- a/flink-sql-connector-pulsar/pom.xml +++ b/flink-sql-connector-pulsar/pom.xml @@ -95,15 +95,11 @@ under the License. org.bouncycastle - org.apache.flink.connector.pulsar.shade.org.bouncycastle + org.apache.pulsar.shade.org.bouncycastle com.scurrilous - org.apache.flink.connector.pulsar.shade.com.scurrilous - - - org.apache.pulsar.shade.io.netty - org.apache.flink.connector.pulsar.shade.io.netty + org.apache.pulsar.shade.com.scurrilous From db8991a1b6c0335d33c01ca6334fb8cf476ec49d Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 17 Jul 2023 19:16:27 +0800 Subject: [PATCH 07/22] Update flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java Co-authored-by: Yufan Sheng --- .../source/enumerator/cursor/stop/MessageIdStopCursor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java index bea8fa26..51fe26b1 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java @@ -42,7 +42,7 @@ public MessageIdStopCursor(MessageId messageId, boolean inclusive) { checkArgument(!earliest.equals(messageId), "MessageId.earliest is not supported."); checkArgument(!latest.equals(messageId), "Use LatestMessageStopCursor instead."); - this.messageId = messageId; + this.messageId = MessageId.fromByteArray(messageId.toByteArray()); this.inclusive = inclusive; } From 0a6f5614ad57beb1f28b12947a0a93cb5a939c4d Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 17 Jul 2023 19:29:59 +0800 Subject: [PATCH 08/22] fix compilation Signed-off-by: tison --- .../enumerator/cursor/stop/MessageIdStopCursor.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java index 51fe26b1..12588a03 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java @@ -18,6 +18,8 @@ package org.apache.flink.connector.pulsar.source.enumerator.cursor.stop; +import java.io.IOException; +import java.io.UncheckedIOException; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.pulsar.client.api.Message; @@ -42,8 +44,12 @@ public MessageIdStopCursor(MessageId messageId, boolean inclusive) { checkArgument(!earliest.equals(messageId), "MessageId.earliest is not supported."); checkArgument(!latest.equals(messageId), "Use LatestMessageStopCursor instead."); - this.messageId = MessageId.fromByteArray(messageId.toByteArray()); this.inclusive = inclusive; + try { + this.messageId = MessageId.fromByteArray(messageId.toByteArray()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } @Override From 625a09b51b0c014f2684e68ac7dbd3180dff0e24 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 17 Jul 2023 19:35:44 +0800 Subject: [PATCH 09/22] checkstyle Signed-off-by: tison --- .../source/enumerator/cursor/stop/MessageIdStopCursor.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java index 12588a03..41fee8dc 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java @@ -18,13 +18,14 @@ package org.apache.flink.connector.pulsar.source.enumerator.cursor.stop; -import java.io.IOException; -import java.io.UncheckedIOException; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import java.io.IOException; +import java.io.UncheckedIOException; + import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.pulsar.client.api.MessageId.earliest; import static org.apache.pulsar.client.api.MessageId.latest; From 2cb31bfdb5143be0f02b98d7b45f0d5993323ed6 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 17 Jul 2023 21:23:51 +0800 Subject: [PATCH 10/22] Revert "revert shade changes" This reverts commit f29bda8e6276adf82ec918defcacbbd6f4c174a8. --- flink-sql-connector-pulsar/pom.xml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-sql-connector-pulsar/pom.xml b/flink-sql-connector-pulsar/pom.xml index bdc3809a..fea51928 100644 --- a/flink-sql-connector-pulsar/pom.xml +++ b/flink-sql-connector-pulsar/pom.xml @@ -95,11 +95,15 @@ under the License. org.bouncycastle - org.apache.pulsar.shade.org.bouncycastle + org.apache.flink.connector.pulsar.shade.org.bouncycastle com.scurrilous - org.apache.pulsar.shade.com.scurrilous + org.apache.flink.connector.pulsar.shade.com.scurrilous + + + org.apache.pulsar.shade.io.netty + org.apache.flink.connector.pulsar.shade.io.netty From f2a3ad1c0595339eeb3c393d8e558423e10673e0 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 17 Jul 2023 22:33:47 +0800 Subject: [PATCH 11/22] Revert "Revert "revert shade changes"" This reverts commit 2cb31bfdb5143be0f02b98d7b45f0d5993323ed6. --- flink-sql-connector-pulsar/pom.xml | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/flink-sql-connector-pulsar/pom.xml b/flink-sql-connector-pulsar/pom.xml index fea51928..bdc3809a 100644 --- a/flink-sql-connector-pulsar/pom.xml +++ b/flink-sql-connector-pulsar/pom.xml @@ -95,15 +95,11 @@ under the License. org.bouncycastle - org.apache.flink.connector.pulsar.shade.org.bouncycastle + org.apache.pulsar.shade.org.bouncycastle com.scurrilous - org.apache.flink.connector.pulsar.shade.com.scurrilous - - - org.apache.pulsar.shade.io.netty - org.apache.flink.connector.pulsar.shade.io.netty + org.apache.pulsar.shade.com.scurrilous From c824082b5a405afe4d8272073df76ded98f6a359 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 18 Jul 2023 00:17:02 +0800 Subject: [PATCH 12/22] Revert "Revert "Revert "revert shade changes""" This reverts commit f2a3ad1c0595339eeb3c393d8e558423e10673e0. --- flink-sql-connector-pulsar/pom.xml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-sql-connector-pulsar/pom.xml b/flink-sql-connector-pulsar/pom.xml index bdc3809a..fea51928 100644 --- a/flink-sql-connector-pulsar/pom.xml +++ b/flink-sql-connector-pulsar/pom.xml @@ -95,11 +95,15 @@ under the License. org.bouncycastle - org.apache.pulsar.shade.org.bouncycastle + org.apache.flink.connector.pulsar.shade.org.bouncycastle com.scurrilous - org.apache.pulsar.shade.com.scurrilous + org.apache.flink.connector.pulsar.shade.com.scurrilous + + + org.apache.pulsar.shade.io.netty + org.apache.flink.connector.pulsar.shade.io.netty From 9ddf2fe6c09a54786c9e12ff6a3a574ef0f46d3a Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 18 Jul 2023 00:51:08 +0800 Subject: [PATCH 13/22] Revert "Revert "Revert "Revert "revert shade changes"""" This reverts commit c824082b5a405afe4d8272073df76ded98f6a359. --- flink-sql-connector-pulsar/pom.xml | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/flink-sql-connector-pulsar/pom.xml b/flink-sql-connector-pulsar/pom.xml index fea51928..bdc3809a 100644 --- a/flink-sql-connector-pulsar/pom.xml +++ b/flink-sql-connector-pulsar/pom.xml @@ -95,15 +95,11 @@ under the License. org.bouncycastle - org.apache.flink.connector.pulsar.shade.org.bouncycastle + org.apache.pulsar.shade.org.bouncycastle com.scurrilous - org.apache.flink.connector.pulsar.shade.com.scurrilous - - - org.apache.pulsar.shade.io.netty - org.apache.flink.connector.pulsar.shade.io.netty + org.apache.pulsar.shade.com.scurrilous From f8b7f55d828857ca58d229866a02c3fc215fedd6 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 18 Jul 2023 01:29:37 +0800 Subject: [PATCH 14/22] try to split context Signed-off-by: tison --- .../tests/util/pulsar/PulsarSinkE2ECase.java | 1 - ...java => PulsarSourceKeySharedE2ECase.java} | 14 +---- .../PulsarSourceMultipleTopicsE2ECase.java | 55 +++++++++++++++++++ 3 files changed, 58 insertions(+), 12 deletions(-) rename flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/{PulsarSourceE2ECase.java => PulsarSourceKeySharedE2ECase.java} (84%) create mode 100644 flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceMultipleTopicsE2ECase.java diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java index e2f79760..37b9b50f 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java @@ -51,7 +51,6 @@ public class PulsarSinkE2ECase extends SinkTestSuiteBase { @TestExternalSystem PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink); - // Defines a set of external context Factories for different test cases. @TestContext PulsarTestContextFactory sinkContext = new PulsarTestContextFactory<>(pulsar, SingleTopicProducingContext::new); diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceKeySharedE2ECase.java similarity index 84% rename from flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java rename to flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceKeySharedE2ECase.java index ac2388a8..71872520 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceKeySharedE2ECase.java @@ -18,9 +18,9 @@ package org.apache.flink.tests.util.pulsar; +import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicsConsumingContext; -import org.apache.flink.connector.pulsar.testutils.source.cases.PartialKeysConsumingContext; import org.apache.flink.connector.testframe.junit.annotations.TestContext; import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; @@ -29,15 +29,12 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment; - import org.junit.jupiter.api.Tag; -import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; - /** Pulsar source E2E test based on the connector testing framework. */ @SuppressWarnings("unused") @Tag("org.apache.flink.testutils.junit.FailsOnJava11") -public class PulsarSourceE2ECase extends SourceTestSuiteBase { +public class PulsarSourceKeySharedE2ECase extends SourceTestSuiteBase { // Defines the Semantic. @TestSemantics CheckpointingMode[] semantics = new CheckpointingMode[] {EXACTLY_ONCE}; @@ -50,12 +47,7 @@ public class PulsarSourceE2ECase extends SourceTestSuiteBase { @TestExternalSystem PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink); - // Defines a set of external context Factories for different test cases. @TestContext - PulsarTestContextFactory multipleTopic = + PulsarTestContextFactory context = new PulsarTestContextFactory<>(pulsar, MultipleTopicsConsumingContext::new); - - @TestContext - PulsarTestContextFactory partialKeys = - new PulsarTestContextFactory<>(pulsar, PartialKeysConsumingContext::new); } diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceMultipleTopicsE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceMultipleTopicsE2ECase.java new file mode 100644 index 00000000..8d98ffdd --- /dev/null +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceMultipleTopicsE2ECase.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.pulsar; + +import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; +import org.apache.flink.connector.pulsar.testutils.source.cases.PartialKeysConsumingContext; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; +import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment; + +import org.junit.jupiter.api.Tag; + +import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; + +/** Pulsar source E2E test based on the connector testing framework. */ +@SuppressWarnings("unused") +@Tag("org.apache.flink.testutils.junit.FailsOnJava11") +public class PulsarSourceMultipleTopicsE2ECase extends SourceTestSuiteBase { + + // Defines the Semantic. + @TestSemantics CheckpointingMode[] semantics = new CheckpointingMode[] {EXACTLY_ONCE}; + + // Defines TestEnvironment. + @TestEnv + FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 6); + + // Defines ConnectorExternalSystem. + @TestExternalSystem + PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink); + + @TestContext + PulsarTestContextFactory context = + new PulsarTestContextFactory<>(pulsar, PartialKeysConsumingContext::new); +} From c8682b17dbd53c0c96ba76894aac95b0ba99b2ae Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 18 Jul 2023 02:08:41 +0800 Subject: [PATCH 15/22] try fix loading Thanks to Yufan! Signed-off-by: tison Co-authored-by: Yufan Sheng --- .../tests/util/pulsar/PulsarSinkE2ECase.java | 4 +- ...dE2ECase.java => PulsarSourceE2ECase.java} | 17 ++++-- .../PulsarSourceMultipleTopicsE2ECase.java | 55 ------------------- .../FlinkContainerWithPulsarEnvironment.java | 9 +-- .../PulsarContainerTestContextFactory.java | 43 +++++++++++++++ .../testutils/PulsarTestCommonUtils.java | 11 +++- .../pulsar/testutils/PulsarTestContext.java | 22 +++++++- 7 files changed, 88 insertions(+), 73 deletions(-) rename flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/{PulsarSourceKeySharedE2ECase.java => PulsarSourceE2ECase.java} (78%) delete mode 100644 flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceMultipleTopicsE2ECase.java create mode 100644 flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestContextFactory.java diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java index 37b9b50f..1cedf47e 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java @@ -27,6 +27,7 @@ import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; +import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestContextFactory; import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment; import org.junit.jupiter.api.Tag; @@ -51,7 +52,8 @@ public class PulsarSinkE2ECase extends SinkTestSuiteBase { @TestExternalSystem PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink); + // Defines a set of external context Factories for different test cases. @TestContext PulsarTestContextFactory sinkContext = - new PulsarTestContextFactory<>(pulsar, SingleTopicProducingContext::new); + new PulsarContainerTestContextFactory<>(pulsar, SingleTopicProducingContext::new); } diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceKeySharedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java similarity index 78% rename from flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceKeySharedE2ECase.java rename to flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java index 71872520..d39a8b67 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceKeySharedE2ECase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java @@ -18,9 +18,9 @@ package org.apache.flink.tests.util.pulsar; -import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicsConsumingContext; +import org.apache.flink.connector.pulsar.testutils.source.cases.PartialKeysConsumingContext; import org.apache.flink.connector.testframe.junit.annotations.TestContext; import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; @@ -28,13 +28,17 @@ import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; +import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestContextFactory; import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment; + import org.junit.jupiter.api.Tag; +import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; + /** Pulsar source E2E test based on the connector testing framework. */ @SuppressWarnings("unused") @Tag("org.apache.flink.testutils.junit.FailsOnJava11") -public class PulsarSourceKeySharedE2ECase extends SourceTestSuiteBase { +public class PulsarSourceE2ECase extends SourceTestSuiteBase { // Defines the Semantic. @TestSemantics CheckpointingMode[] semantics = new CheckpointingMode[] {EXACTLY_ONCE}; @@ -47,7 +51,12 @@ public class PulsarSourceKeySharedE2ECase extends SourceTestSuiteBase { @TestExternalSystem PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink); + // Defines a set of external context Factories for different test cases. + @TestContext + PulsarTestContextFactory multipleTopic = + new PulsarContainerTestContextFactory<>(pulsar, MultipleTopicsConsumingContext::new); + @TestContext - PulsarTestContextFactory context = - new PulsarTestContextFactory<>(pulsar, MultipleTopicsConsumingContext::new); + PulsarTestContextFactory partialKeys = + new PulsarContainerTestContextFactory<>(pulsar, PartialKeysConsumingContext::new); } diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceMultipleTopicsE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceMultipleTopicsE2ECase.java deleted file mode 100644 index 8d98ffdd..00000000 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceMultipleTopicsE2ECase.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tests.util.pulsar; - -import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; -import org.apache.flink.connector.pulsar.testutils.source.cases.PartialKeysConsumingContext; -import org.apache.flink.connector.testframe.junit.annotations.TestContext; -import org.apache.flink.connector.testframe.junit.annotations.TestEnv; -import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; -import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; -import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; -import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; -import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment; - -import org.junit.jupiter.api.Tag; - -import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; - -/** Pulsar source E2E test based on the connector testing framework. */ -@SuppressWarnings("unused") -@Tag("org.apache.flink.testutils.junit.FailsOnJava11") -public class PulsarSourceMultipleTopicsE2ECase extends SourceTestSuiteBase { - - // Defines the Semantic. - @TestSemantics CheckpointingMode[] semantics = new CheckpointingMode[] {EXACTLY_ONCE}; - - // Defines TestEnvironment. - @TestEnv - FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 6); - - // Defines ConnectorExternalSystem. - @TestExternalSystem - PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink); - - @TestContext - PulsarTestContextFactory context = - new PulsarTestContextFactory<>(pulsar, PartialKeysConsumingContext::new); -} diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java index ab018f80..5088913a 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java @@ -24,18 +24,11 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment; -import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.resourcePath; - /** A Flink Container which would bundles pulsar connector in its classpath. */ public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvironment { public FlinkContainerWithPulsarEnvironment(int numTaskManagers, int numSlotsPerTaskManager) { - super( - flinkConfiguration(), - numTaskManagers, - numSlotsPerTaskManager, - resourcePath("pulsar-connector.jar"), - resourcePath("flink-connector-testing.jar")); + super(flinkConfiguration(), numTaskManagers, numSlotsPerTaskManager); } private static Configuration flinkConfiguration() { diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestContextFactory.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestContextFactory.java new file mode 100644 index 00000000..89c5a3d5 --- /dev/null +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestContextFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.pulsar.common; + +import org.apache.flink.connector.pulsar.testutils.PulsarTestContext; +import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; +import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; + +import java.util.function.Function; + +/** + * This class is used for creating the Pulsar test context which will running in the Flink + * containers. + */ +public class PulsarContainerTestContextFactory> extends PulsarTestContextFactory { + + public PulsarContainerTestContextFactory(PulsarTestEnvironment environment, Function contextFactory) { + super(environment, contextFactory); + } + + @Override + public T createExternalContext(String testName) { + T context = super.createExternalContext(testName); + context.inContainer(); + return context; + } +} diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java index 487fbdcd..970dd44d 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java @@ -25,9 +25,12 @@ import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.test.resources.ResourceTestUtils; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.pulsar.client.api.MessageId; +import java.net.MalformedURLException; +import java.net.URL; import java.util.ArrayList; import java.util.List; @@ -46,8 +49,12 @@ public static DeliveryGuarantee toDeliveryGuarantee(CheckpointingMode checkpoint } } - public static String resourcePath(String jarName) { - return ResourceTestUtils.getResource(jarName).toAbsolutePath().toString(); + public static URL resourcePath(String jarName) { + try { + return ResourceTestUtils.getResource(jarName).toAbsolutePath().toUri().toURL(); + } catch (MalformedURLException e) { + throw new FlinkRuntimeException("Couldn't find jar: " + jarName); + } } /** creates a fullRange() partitionSplit. */ diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java index d283df87..805bc22e 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java @@ -21,12 +21,17 @@ import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator; import org.apache.flink.connector.testframe.external.ExternalContext; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + import org.apache.pulsar.client.api.Schema; import java.net.URL; import java.util.Collections; import java.util.List; +import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.resourcePath; + + /** * The implementation for Flink connector test tools. Providing the common test case writing * constraint for both source, sink and table API. @@ -36,6 +41,8 @@ public abstract class PulsarTestContext implements ExternalContext { protected final PulsarRuntimeOperator operator; // The schema used for consuming and producing messages between Pulsar and tests. protected final Schema schema; + // If this test case was running in a Flink container. + protected boolean container = false; protected PulsarTestContext(PulsarTestEnvironment environment, Schema schema) { this.operator = environment.operator(); @@ -50,11 +57,20 @@ public String toString() { return displayName(); } + public void inContainer() { + this.container = true; + } + @Override public List getConnectorJarPaths() { - // We don't need any test jars' definition. - // They are provided in docker-related environments. - return Collections.emptyList(); + if (container) { + return ImmutableList.of( + resourcePath("pulsar-connector.jar"), + resourcePath("flink-connector-testing.jar")); + } else { + // We don't need any definition of test jars by default. + return Collections.emptyList(); + } } /** From 6d01fda77168893c6c8b762319d439b94d6b0fb4 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 18 Jul 2023 02:24:20 +0800 Subject: [PATCH 16/22] spotless Signed-off-by: tison --- .../src/test/resources/log4j2-test.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-pulsar-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-pulsar-e2e-tests/src/test/resources/log4j2-test.properties index 835c2ec9..e463a0e1 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/resources/log4j2-test.properties +++ b/flink-connector-pulsar-e2e-tests/src/test/resources/log4j2-test.properties @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = OFF +rootLogger.level = INFO rootLogger.appenderRef.test.ref = TestLogger appender.testlogger.name = TestLogger From 9d2f2b70932ea0ec0b40bbbbb3d9f87e2f5edae8 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 18 Jul 2023 02:33:50 +0800 Subject: [PATCH 17/22] spotless Signed-off-by: tison --- .../flink/connector/pulsar/testutils/PulsarTestContext.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java index 805bc22e..e1cc5442 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java @@ -31,7 +31,6 @@ import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.resourcePath; - /** * The implementation for Flink connector test tools. Providing the common test case writing * constraint for both source, sink and table API. From 100f103ba843bd795f03e7b632e85119c1cc159f Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 18 Jul 2023 02:57:43 +0800 Subject: [PATCH 18/22] spotless Signed-off-by: tison --- .../pulsar/common/PulsarContainerTestContextFactory.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestContextFactory.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestContextFactory.java index 89c5a3d5..85c312d9 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestContextFactory.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestContextFactory.java @@ -28,9 +28,11 @@ * This class is used for creating the Pulsar test context which will running in the Flink * containers. */ -public class PulsarContainerTestContextFactory> extends PulsarTestContextFactory { +public class PulsarContainerTestContextFactory> + extends PulsarTestContextFactory { - public PulsarContainerTestContextFactory(PulsarTestEnvironment environment, Function contextFactory) { + public PulsarContainerTestContextFactory( + PulsarTestEnvironment environment, Function contextFactory) { super(environment, contextFactory); } From 51be7ad2073266ec25cf286346f6a864e1097112 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 18 Jul 2023 03:50:19 +0800 Subject: [PATCH 19/22] try increase metaspace Signed-off-by: tison --- .../pulsar/common/FlinkContainerWithPulsarEnvironment.java | 4 ++-- .../src/test/resources/log4j2-test.properties | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java index 5088913a..68f98e66 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java @@ -37,8 +37,8 @@ private static Configuration flinkConfiguration() { // Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2048)); configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(512)); - configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2048)); - configuration.set(JobManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(512)); + configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2560)); + configuration.set(JobManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(1024)); return configuration; } diff --git a/flink-connector-pulsar-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-pulsar-e2e-tests/src/test/resources/log4j2-test.properties index e463a0e1..835c2ec9 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/resources/log4j2-test.properties +++ b/flink-connector-pulsar-e2e-tests/src/test/resources/log4j2-test.properties @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = INFO +rootLogger.level = OFF rootLogger.appenderRef.test.ref = TestLogger appender.testlogger.name = TestLogger From 176125e06e9de15b760c73b307f2fc61ee0f9e2b Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 18 Jul 2023 09:39:18 +0800 Subject: [PATCH 20/22] trigger Signed-off-by: tison From 520bf565737faf1503651e7c723ddc022714cf24 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 18 Jul 2023 10:40:48 +0800 Subject: [PATCH 21/22] cleanup code Signed-off-by: tison --- .../tests/util/pulsar/PulsarSinkE2ECase.java | 21 +++++---- .../util/pulsar/PulsarSourceE2ECase.java | 26 +++++++---- ...ironment.java => FlinkContainerUtils.java} | 22 ++++++--- .../PulsarContainerTestContextFactory.java | 45 ------------------- .../PulsarContainerTestEnvironment.java | 3 +- .../pulsar/testutils/PulsarTestContext.java | 27 ++++------- 6 files changed, 56 insertions(+), 88 deletions(-) rename flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/{FlinkContainerWithPulsarEnvironment.java => FlinkContainerUtils.java} (75%) delete mode 100644 flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestContextFactory.java diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java index 1cedf47e..74467609 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java @@ -18,16 +18,17 @@ package org.apache.flink.tests.util.pulsar; -import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; +import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; import org.apache.flink.connector.pulsar.testutils.sink.cases.SingleTopicProducingContext; +import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment; +import org.apache.flink.connector.testframe.external.ExternalContextFactory; import org.apache.flink.connector.testframe.junit.annotations.TestContext; import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase; import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; -import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestContextFactory; +import org.apache.flink.tests.util.pulsar.common.FlinkContainerUtils; import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment; import org.junit.jupiter.api.Tag; @@ -46,14 +47,18 @@ public class PulsarSinkE2ECase extends SinkTestSuiteBase { // Defines TestEnvironment @TestEnv - FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 6); + FlinkContainerTestEnvironment flink = + new FlinkContainerTestEnvironment(FlinkContainerUtils.flinkConfiguration(), 1, 6); // Defines ConnectorExternalSystem. - @TestExternalSystem - PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink); + @TestExternalSystem PulsarTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink); // Defines a set of external context Factories for different test cases. @TestContext - PulsarTestContextFactory sinkContext = - new PulsarContainerTestContextFactory<>(pulsar, SingleTopicProducingContext::new); + ExternalContextFactory sinkContext = + ignore -> { + final SingleTopicProducingContext context = new SingleTopicProducingContext(pulsar); + context.addConnectorJarPaths(FlinkContainerUtils.connectorJarPaths()); + return context; + }; } diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java index d39a8b67..4f4cadc6 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java @@ -18,17 +18,17 @@ package org.apache.flink.tests.util.pulsar; -import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicsConsumingContext; import org.apache.flink.connector.pulsar.testutils.source.cases.PartialKeysConsumingContext; +import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment; +import org.apache.flink.connector.testframe.external.ExternalContextFactory; import org.apache.flink.connector.testframe.junit.annotations.TestContext; import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; -import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestContextFactory; +import org.apache.flink.tests.util.pulsar.common.FlinkContainerUtils; import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment; import org.junit.jupiter.api.Tag; @@ -45,7 +45,8 @@ public class PulsarSourceE2ECase extends SourceTestSuiteBase { // Defines TestEnvironment. @TestEnv - FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 6); + FlinkContainerTestEnvironment flink = + new FlinkContainerTestEnvironment(FlinkContainerUtils.flinkConfiguration(), 1, 6); // Defines ConnectorExternalSystem. @TestExternalSystem @@ -53,10 +54,19 @@ public class PulsarSourceE2ECase extends SourceTestSuiteBase { // Defines a set of external context Factories for different test cases. @TestContext - PulsarTestContextFactory multipleTopic = - new PulsarContainerTestContextFactory<>(pulsar, MultipleTopicsConsumingContext::new); + ExternalContextFactory multipleTopic = + ignore -> { + final MultipleTopicsConsumingContext context = + new MultipleTopicsConsumingContext(pulsar); + context.addConnectorJarPaths(FlinkContainerUtils.connectorJarPaths()); + return context; + }; @TestContext - PulsarTestContextFactory partialKeys = - new PulsarContainerTestContextFactory<>(pulsar, PartialKeysConsumingContext::new); + ExternalContextFactory partialKeys = + ignore -> { + final PartialKeysConsumingContext context = new PartialKeysConsumingContext(pulsar); + context.addConnectorJarPaths(FlinkContainerUtils.connectorJarPaths()); + return context; + }; } diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java similarity index 75% rename from flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java rename to flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java index 68f98e66..43e951fe 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java @@ -22,16 +22,17 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment; -/** A Flink Container which would bundles pulsar connector in its classpath. */ -public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvironment { +import java.net.URL; +import java.util.ArrayList; +import java.util.List; - public FlinkContainerWithPulsarEnvironment(int numTaskManagers, int numSlotsPerTaskManager) { - super(flinkConfiguration(), numTaskManagers, numSlotsPerTaskManager); - } +import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.resourcePath; + +/** Shared utilities for building Flink containers. */ +public class FlinkContainerUtils { - private static Configuration flinkConfiguration() { + public static Configuration flinkConfiguration() { Configuration configuration = new Configuration(); // Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace @@ -42,4 +43,11 @@ private static Configuration flinkConfiguration() { return configuration; } + + public static List connectorJarPaths() { + List urls = new ArrayList<>(); + urls.add(resourcePath("pulsar-connector.jar")); + urls.add(resourcePath("flink-connector-testing.jar")); + return urls; + } } diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestContextFactory.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestContextFactory.java deleted file mode 100644 index 85c312d9..00000000 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestContextFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tests.util.pulsar.common; - -import org.apache.flink.connector.pulsar.testutils.PulsarTestContext; -import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; -import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; - -import java.util.function.Function; - -/** - * This class is used for creating the Pulsar test context which will running in the Flink - * containers. - */ -public class PulsarContainerTestContextFactory> - extends PulsarTestContextFactory { - - public PulsarContainerTestContextFactory( - PulsarTestEnvironment environment, Function contextFactory) { - super(environment, contextFactory); - } - - @Override - public T createExternalContext(String testName) { - T context = super.createExternalContext(testName); - context.inContainer(); - return context; - } -} diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java index 654347be..059f477f 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java @@ -19,13 +19,14 @@ package org.apache.flink.tests.util.pulsar.common; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; +import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment; import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container; /** This test environment is used for create a Pulsar standalone instance for e2e tests. */ public class PulsarContainerTestEnvironment extends PulsarTestEnvironment { - public PulsarContainerTestEnvironment(FlinkContainerWithPulsarEnvironment flinkEnvironment) { + public PulsarContainerTestEnvironment(FlinkContainerTestEnvironment flinkEnvironment) { super(container(flinkEnvironment.getFlinkContainers().getJobManager())); } } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java index e1cc5442..531aa20b 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java @@ -21,16 +21,12 @@ import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator; import org.apache.flink.connector.testframe.external.ExternalContext; -import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; - import org.apache.pulsar.client.api.Schema; import java.net.URL; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; -import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.resourcePath; - /** * The implementation for Flink connector test tools. Providing the common test case writing * constraint for both source, sink and table API. @@ -40,8 +36,8 @@ public abstract class PulsarTestContext implements ExternalContext { protected final PulsarRuntimeOperator operator; // The schema used for consuming and producing messages between Pulsar and tests. protected final Schema schema; - // If this test case was running in a Flink container. - protected boolean container = false; + + private final List connectorJarPaths = new ArrayList<>(); protected PulsarTestContext(PulsarTestEnvironment environment, Schema schema) { this.operator = environment.operator(); @@ -56,20 +52,13 @@ public String toString() { return displayName(); } - public void inContainer() { - this.container = true; - } - @Override public List getConnectorJarPaths() { - if (container) { - return ImmutableList.of( - resourcePath("pulsar-connector.jar"), - resourcePath("flink-connector-testing.jar")); - } else { - // We don't need any definition of test jars by default. - return Collections.emptyList(); - } + return connectorJarPaths; + } + + public void addConnectorJarPaths(List urls) { + this.connectorJarPaths.addAll(urls); } /** From 96f0b7eac17eed5cbbde2ca2f144d20e25ff75fe Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 18 Jul 2023 11:48:36 +0800 Subject: [PATCH 22/22] need not serde message id Signed-off-by: tison --- .../enumerator/cursor/stop/MessageIdStopCursor.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java index 41fee8dc..668b8ef6 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java @@ -23,9 +23,6 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import java.io.IOException; -import java.io.UncheckedIOException; - import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.pulsar.client.api.MessageId.earliest; import static org.apache.pulsar.client.api.MessageId.latest; @@ -46,11 +43,7 @@ public MessageIdStopCursor(MessageId messageId, boolean inclusive) { checkArgument(!latest.equals(messageId), "Use LatestMessageStopCursor instead."); this.inclusive = inclusive; - try { - this.messageId = MessageId.fromByteArray(messageId.toByteArray()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + this.messageId = messageId; } @Override