Skip to content

Commit 81e7328

Browse files
committed
[FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. (apache#19972)
* Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
1 parent 0e19d8e commit 81e7328

File tree

56 files changed

+2923
-1263
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+2923
-1263
lines changed

docs/content.zh/docs/connectors/datastream/pulsar.md

+117-114
Large diffs are not rendered by default.

docs/content/docs/connectors/datastream/pulsar.md

+147-125
Large diffs are not rendered by default.

docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html

-6
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,6 @@
140140
<td>Boolean</td>
141141
<td>If enabled, the consumer will automatically retry messages.</td>
142142
</tr>
143-
<tr>
144-
<td><h5>pulsar.consumer.subscriptionInitialPosition</h5></td>
145-
<td style="word-wrap: break-word;">Latest</td>
146-
<td><p>Enum</p></td>
147-
<td>Initial position at which to set cursor when subscribing to a topic at first time.<br /><br />Possible values:<ul><li>"Latest"</li><li>"Earliest"</li></ul></td>
148-
</tr>
149143
<tr>
150144
<td><h5>pulsar.consumer.subscriptionMode</h5></td>
151145
<td style="word-wrap: break-word;">Durable</td>

flink-connectors/flink-connector-pulsar/pom.xml

+67-8
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@ under the License.
3636
<packaging>jar</packaging>
3737

3838
<properties>
39-
<pulsar.version>2.8.0</pulsar.version>
39+
<pulsar.version>2.9.1</pulsar.version>
4040

4141
<!-- Test Libraries -->
4242
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
4343
<assertj-core.version>3.20.2</assertj-core.version>
44-
<commons-lang3.version>3.11</commons-lang3.version>
45-
<grpc.version>1.33.0</grpc.version>
44+
<pulsar-commons-lang3.version>3.11</pulsar-commons-lang3.version>
45+
<pulsar-zookeeper.version>3.6.3</pulsar-zookeeper.version>
46+
<pulsar-netty.version>4.1.72.Final</pulsar-netty.version>
47+
<pulsar-grpc.version>1.33.0</pulsar-grpc.version>
4648
</properties>
4749

4850
<dependencies>
@@ -153,12 +155,22 @@ under the License.
153155
<version>${pulsar.version}</version>
154156
<scope>test</scope>
155157
</dependency>
158+
156159
<!-- Pulsar use a newer commons-lang3 in broker. -->
157160
<!-- Bump the version only for testing. -->
158161
<dependency>
159162
<groupId>org.apache.commons</groupId>
160163
<artifactId>commons-lang3</artifactId>
161-
<version>${commons-lang3.version}</version>
164+
<version>${pulsar-commons-lang3.version}</version>
165+
<scope>test</scope>
166+
</dependency>
167+
168+
<!-- Pulsar use a newer zookeeper in broker. -->
169+
<!-- Bump the version only for testing. -->
170+
<dependency>
171+
<groupId>org.apache.zookeeper</groupId>
172+
<artifactId>zookeeper</artifactId>
173+
<version>${pulsar-zookeeper.version}</version>
162174
<scope>test</scope>
163175
</dependency>
164176

@@ -170,21 +182,63 @@ under the License.
170182
<artifactId>pulsar-client-all</artifactId>
171183
<version>${pulsar.version}</version>
172184
<exclusions>
185+
<exclusion>
186+
<groupId>com.sun.activation</groupId>
187+
<artifactId>javax.activation</artifactId>
188+
</exclusion>
189+
<exclusion>
190+
<groupId>jakarta.activation</groupId>
191+
<artifactId>jakarta.activation-api</artifactId>
192+
</exclusion>
193+
<exclusion>
194+
<groupId>jakarta.ws.rs</groupId>
195+
<artifactId>jakarta.ws.rs-api</artifactId>
196+
</exclusion>
197+
<exclusion>
198+
<groupId>jakarta.xml.bind</groupId>
199+
<artifactId>jakarta.xml.bind-api</artifactId>
200+
</exclusion>
201+
<exclusion>
202+
<groupId>javax.validation</groupId>
203+
<artifactId>validation-api</artifactId>
204+
</exclusion>
205+
<exclusion>
206+
<groupId>javax.xml.bind</groupId>
207+
<artifactId>jaxb-api</artifactId>
208+
</exclusion>
209+
<exclusion>
210+
<groupId>net.jcip</groupId>
211+
<artifactId>jcip-annotations</artifactId>
212+
</exclusion>
173213
<exclusion>
174214
<groupId>org.apache.pulsar</groupId>
175215
<artifactId>pulsar-package-core</artifactId>
176216
</exclusion>
217+
<exclusion>
218+
<groupId>com.beust</groupId>
219+
<artifactId>jcommander</artifactId>
220+
</exclusion>
177221
</exclusions>
178222
</dependency>
179223
</dependencies>
180224

181-
<!-- gRPC use version range which don't support by flink ci. -->
225+
182226
<dependencyManagement>
183227
<dependencies>
228+
<!-- Pulsar use higher gRPC version. -->
184229
<dependency>
185230
<groupId>io.grpc</groupId>
186231
<artifactId>grpc-bom</artifactId>
187-
<version>${grpc.version}</version>
232+
<version>${pulsar-grpc.version}</version>
233+
<type>pom</type>
234+
<scope>import</scope>
235+
</dependency>
236+
237+
<!-- Pulsar use higher netty version. -->
238+
<dependency>
239+
<groupId>io.netty</groupId>
240+
<artifactId>netty-bom</artifactId>
241+
<version>${pulsar-netty.version}</version>
188242
<type>pom</type>
189243
<scope>import</scope>
190244
</dependency>
@@ -207,7 +261,9 @@ under the License.
207261
<configuration>
208262
<!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
209263
<forkCount>1</forkCount>
210-
<argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit -Duser.country=US -Duser.language=en</argLine>
264+
<argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber}
265+
-XX:-UseGCOverheadLimit -Duser.country=US -Duser.language=en
266+
</argLine>
211267
</configuration>
212268
</plugin>
213269
<plugin>
@@ -229,7 +285,8 @@ under the License.
229285
<outputDirectory>
230286
${project.build.directory}/generated-test-sources/protobuf/java
231287
</outputDirectory>
232-
<protocArtifact>com.google.protobuf:protoc:3.5.1:exe:${os.detected.classifier}
288+
<protocArtifact>
289+
com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
233290
</protocArtifact>
234291
</configuration>
235292
<executions>
@@ -275,6 +332,7 @@ under the License.
275332
<include>**/testutils/**</include>
276333
<include>META-INF/LICENSE</include>
277334
<include>META-INF/NOTICE</include>
335+
<include>containers/txnStandalone.conf</include>
278336
</includes>
279337
</configuration>
280338
</execution>
@@ -298,6 +356,7 @@ under the License.
298356
<include>**/testutils/**</include>
299357
<include>META-INF/LICENSE</include>
300358
<include>META-INF/NOTICE</include>
359+
<include>containers/txnStandalone.conf</include>
301360
</includes>
302361
</configuration>
303362
</execution>

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfigUtils.java

+32
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.configuration.ConfigOption;
23+
import org.apache.flink.configuration.ConfigOptions;
2324
import org.apache.flink.configuration.Configuration;
2425

2526
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -31,6 +32,8 @@
3132
import org.apache.pulsar.client.api.PulsarClient;
3233
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
3334

35+
import java.util.HashMap;
36+
import java.util.List;
3437
import java.util.Map;
3538
import java.util.TreeSet;
3639
import java.util.function.Consumer;
@@ -40,6 +43,7 @@
4043
import static java.util.concurrent.TimeUnit.NANOSECONDS;
4144
import static java.util.concurrent.TimeUnit.SECONDS;
4245
import static java.util.function.Function.identity;
46+
import static java.util.stream.Collectors.toList;
4347
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
4448
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
4549
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
@@ -273,4 +277,32 @@ public static <T, V> void setOptionValue(
273277
setter.accept(value);
274278
}
275279
}
280+
281+
/**
282+
* Get the option value by a prefix. We would return an empty map if the option doesn't exist.
283+
*/
284+
public static Map<String, String> getProperties(
285+
Configuration configuration, ConfigOption<Map<String, String>> option) {
286+
Map<String, String> properties = new HashMap<>();
287+
if (configuration.contains(option)) {
288+
Map<String, String> map = configuration.get(option);
289+
properties.putAll(map);
290+
}
291+
292+
// Filter the sub config option. These options could be provided by SQL.
293+
String prefix = option.key() + ".";
294+
List<String> keys =
295+
configuration.keySet().stream()
296+
.filter(key -> key.startsWith(prefix) && key.length() > prefix.length())
297+
.collect(toList());
298+
299+
// Put these config options' value into return result.
300+
for (String key : keys) {
301+
ConfigOption<String> o = ConfigOptions.key(key).stringType().noDefaultValue();
302+
String value = configuration.get(o);
303+
properties.put(key.substring(prefix.length()), value);
304+
}
305+
306+
return properties;
307+
}
276308
}

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java

-118
This file was deleted.

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
3333
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
3434
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
35-
import org.apache.flink.connector.pulsar.source.enumerator.SplitsAssignmentState;
35+
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
36+
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory;
3637
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
3738
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
3839
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
@@ -144,32 +145,31 @@ public SourceReader<OUT, PulsarPartitionSplit> createReader(SourceReaderContext
144145
@Override
145146
public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> createEnumerator(
146147
SplitEnumeratorContext<PulsarPartitionSplit> enumContext) {
147-
SplitsAssignmentState assignmentState =
148-
new SplitsAssignmentState(stopCursor, sourceConfiguration);
148+
SplitAssigner splitAssigner = SplitAssignerFactory.create(stopCursor, sourceConfiguration);
149149
return new PulsarSourceEnumerator(
150150
subscriber,
151151
startCursor,
152152
rangeGenerator,
153153
configuration,
154154
sourceConfiguration,
155155
enumContext,
156-
assignmentState);
156+
splitAssigner);
157157
}
158158

159159
@Override
160160
public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> restoreEnumerator(
161161
SplitEnumeratorContext<PulsarPartitionSplit> enumContext,
162162
PulsarSourceEnumState checkpoint) {
163-
SplitsAssignmentState assignmentState =
164-
new SplitsAssignmentState(stopCursor, sourceConfiguration, checkpoint);
163+
SplitAssigner splitAssigner =
164+
SplitAssignerFactory.create(stopCursor, sourceConfiguration, checkpoint);
165165
return new PulsarSourceEnumerator(
166166
subscriber,
167167
startCursor,
168168
rangeGenerator,
169169
configuration,
170170
sourceConfiguration,
171171
enumContext,
172-
assignmentState);
172+
splitAssigner);
173173
}
174174

175175
@Override

0 commit comments

Comments
 (0)