Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-32612] Upgrade Pulsar version to 3.0.0 #54

Merged
merged 22 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestContextFactory;
import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;

import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -54,5 +55,5 @@ public class PulsarSinkE2ECase extends SinkTestSuiteBase<String> {
// Defines a set of external context Factories for different test cases.
@TestContext
PulsarTestContextFactory<String, SingleTopicProducingContext> sinkContext =
new PulsarTestContextFactory<>(pulsar, SingleTopicProducingContext::new);
new PulsarContainerTestContextFactory<>(pulsar, SingleTopicProducingContext::new);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestContextFactory;
import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;

import org.junit.jupiter.api.Tag;
Expand All @@ -53,9 +54,9 @@ public class PulsarSourceE2ECase extends SourceTestSuiteBase<String> {
// Defines a set of external context Factories for different test cases.
@TestContext
PulsarTestContextFactory<String, MultipleTopicsConsumingContext> multipleTopic =
new PulsarTestContextFactory<>(pulsar, MultipleTopicsConsumingContext::new);
new PulsarContainerTestContextFactory<>(pulsar, MultipleTopicsConsumingContext::new);

@TestContext
PulsarTestContextFactory<String, PartialKeysConsumingContext> partialKeys =
new PulsarTestContextFactory<>(pulsar, PartialKeysConsumingContext::new);
new PulsarContainerTestContextFactory<>(pulsar, PartialKeysConsumingContext::new);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,11 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;

import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.resourcePath;

/** A Flink Container which would bundles pulsar connector in its classpath. */
public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvironment {

public FlinkContainerWithPulsarEnvironment(int numTaskManagers, int numSlotsPerTaskManager) {
super(
flinkConfiguration(),
numTaskManagers,
numSlotsPerTaskManager,
resourcePath("pulsar-connector.jar"),
resourcePath("flink-connector-testing.jar"));
super(flinkConfiguration(), numTaskManagers, numSlotsPerTaskManager);
}

private static Configuration flinkConfiguration() {
Expand All @@ -44,8 +37,8 @@ private static Configuration flinkConfiguration() {
// Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2048));
configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(512));
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2048));
configuration.set(JobManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(512));
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2560));
configuration.set(JobManagerOptions.JVM_METASPACE, MemorySize.ofMebiBytes(1024));
Comment on lines +41 to +42
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporarily workaround. Hopefully we locate why JM metaspace OOM. It can be related to both the size of the final JAR ball and how we load and unload the JAR ball.


return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util.pulsar.common;

import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;

import java.util.function.Function;

/**
* This class is used for creating the Pulsar test context which will running in the Flink
* containers.
*/
public class PulsarContainerTestContextFactory<F, T extends PulsarTestContext<F>>
extends PulsarTestContextFactory<F, T> {

public PulsarContainerTestContextFactory(
PulsarTestEnvironment environment, Function<PulsarTestEnvironment, T> contextFactory) {
super(environment, contextFactory);
}

@Override
public T createExternalContext(String testName) {
T context = super.createExternalContext(testName);
context.inContainer();
return context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;

import java.io.IOException;
import java.io.UncheckedIOException;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.pulsar.client.api.MessageId.earliest;
import static org.apache.pulsar.client.api.MessageId.latest;
import static org.apache.pulsar.client.impl.MessageIdImpl.convertToMessageIdImpl;

/**
* Stop consuming message at a given message id. We use the {@link MessageId#compareTo(Object)} for
Expand All @@ -43,8 +45,12 @@ public MessageIdStopCursor(MessageId messageId, boolean inclusive) {
checkArgument(!earliest.equals(messageId), "MessageId.earliest is not supported.");
checkArgument(!latest.equals(messageId), "Use LatestMessageStopCursor instead.");

this.messageId = convertToMessageIdImpl(messageId);
this.inclusive = inclusive;
try {
this.messageId = MessageId.fromByteArray(messageId.toByteArray());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.flink.connector.pulsar.sink.writer;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -205,6 +207,25 @@ public int getAttemptNumber() {
return 0;
}

// The following three methods are for compatibility with
// https://github.com/apache/flink/commit/4f5b2fb5736f5a1c098a7dc1d448a879f36f801b
// . Removed the commented out `@Override` when we move to 1.18.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just add the @Override annotation. Because this is just a test class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that javac will fail on compilation when we build with Flink not 1.18-SNAPSHOT.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. But the master branch should always focus on the latest snapshot version of Flink.

// @Override
public boolean isObjectReuseEnabled() {
return false;
}

// @Override
public <IN> TypeSerializer<IN> createInputSerializer() {
return null;
}

// @Override
public JobID getJobId() {
return null;
}

@Override
public SinkWriterMetricGroup metricGroup() {
return metricGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.pulsar.client.api.MessageId;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -46,8 +49,12 @@ public static DeliveryGuarantee toDeliveryGuarantee(CheckpointingMode checkpoint
}
}

public static String resourcePath(String jarName) {
return ResourceTestUtils.getResource(jarName).toAbsolutePath().toString();
public static URL resourcePath(String jarName) {
try {
return ResourceTestUtils.getResource(jarName).toAbsolutePath().toUri().toURL();
} catch (MalformedURLException e) {
throw new FlinkRuntimeException("Couldn't find jar: " + jarName);
}
}

/** creates a fullRange() partitionSplit. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
import org.apache.flink.connector.testframe.external.ExternalContext;

import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;

import org.apache.pulsar.client.api.Schema;

import java.net.URL;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.resourcePath;

/**
* The implementation for Flink connector test tools. Providing the common test case writing
* constraint for both source, sink and table API.
Expand All @@ -36,6 +40,8 @@ public abstract class PulsarTestContext<T> implements ExternalContext {
protected final PulsarRuntimeOperator operator;
// The schema used for consuming and producing messages between Pulsar and tests.
protected final Schema<T> schema;
// If this test case was running in a Flink container.
protected boolean container = false;

protected PulsarTestContext(PulsarTestEnvironment environment, Schema<T> schema) {
this.operator = environment.operator();
Expand All @@ -50,11 +56,20 @@ public String toString() {
return displayName();
}

public void inContainer() {
this.container = true;
}

@Override
public List<URL> getConnectorJarPaths() {
// We don't need any test jars' definition.
// They are provided in docker-related environments.
return Collections.emptyList();
if (container) {
return ImmutableList.of(
resourcePath("pulsar-connector.jar"),
resourcePath("flink-connector-testing.jar"));
} else {
// We don't need any definition of test jars by default.
return Collections.emptyList();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class PulsarContainerRuntime implements PulsarRuntime {
private static final String PULSAR_ADMIN_URL =
String.format("http://%s:%d", PULSAR_INTERNAL_HOSTNAME, BROKER_HTTP_PORT);

private static final String CURRENT_VERSION = "2.11.0";
private static final String CURRENT_VERSION = "3.0.0";

private final PulsarContainer container;
private final AtomicBoolean started;
Expand Down
5 changes: 5 additions & 0 deletions flink-sql-connector-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ under the License.
<artifactId>flink-connector-pulsar</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
</dependencies>

<build>
Expand All @@ -61,6 +65,7 @@ under the License.
<configuration>
<artifactSet>
<includes>
<include>com.fasterxml.jackson.core:jackson-annotations</include>
<include>org.apache.flink:flink-connector-base</include>
<include>org.apache.flink:flink-connector-pulsar</include>
<include>org.apache.pulsar:pulsar-client-admin-api</include>
Expand Down
7 changes: 4 additions & 3 deletions flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ The Apache Software Foundation (http://www.apache.org/).

This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)

- org.apache.pulsar:pulsar-client-admin-api:2.11.0
- org.apache.pulsar:pulsar-client-all:2.11.0
- org.apache.pulsar:pulsar-client-api:2.11.0
- com.fasterxml.jackson.core:jackson-annotations:2.13.4
- org.apache.pulsar:pulsar-client-admin-api:3.0.0
- org.apache.pulsar:pulsar-client-all:3.0.0
- org.apache.pulsar:pulsar-client-api:3.0.0

This project bundles the following dependencies under the Bouncy Castle license.
See bundled license files for details.
Expand Down
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ under the License.

<properties>
<flink.version>1.17.0</flink.version>
<pulsar.version>2.11.0</pulsar.version>
<flink-ci-tools.version>1.18-SNAPSHOT</flink-ci-tools.version>
<pulsar.version>3.0.0</pulsar.version>
<bouncycastle.version>1.69</bouncycastle.version>

<jsr305.version>1.3.9</jsr305.version>
Expand Down Expand Up @@ -457,7 +458,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-ci-tools</artifactId>
<version>${flink.version}</version>
<version>${flink-ci-tools.version}</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just change the flink.version to 1.18-SNAPSHOT

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think of it. If we accept this change, #54 (comment) can be applied also.

But before we cut a release, we should always pin a non-snapshot version.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside is that main branch and PRs may be suddenly broken due to SNAPSHOT changes, but it should not happen and we sooner or later need to fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think of it. If we accept this change, #54 (comment) can be applied also.

But before we cut a release, we should always pin a non-snapshot version.

In my opinion, we should checkout a 4.0 branch for the 1.7 Flink.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already one https://github.com/apache/flink-connector-pulsar/tree/v4.0.

Do you mean we move to 1.18 on master and cut a new release when 1.18 release and for 1.18+ only?

Although, these breaking changes existing in test code only so it should be still compatible with 1.17.

But I'd prefer to do this in another round of PR. This PR focus on upgrading Pulsar version and these changes are workaround for the current failing tests.

Copy link
Contributor

@syhily syhily Jul 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Bump the master branch with Flink 1.17 and Pulsar 3.0 support. Then we merge back to 4.0 branch and change the master branch to Flink 1.18 snapshort. Am I right?

</dependency>
</dependencies>
</plugin>
Expand Down