Skip to content

Commit 18d21a0

Browse files
authored
[FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. (apache#19972)
* Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
1 parent f2fb6b2 commit 18d21a0

File tree

43 files changed

+1231
-698
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1231
-698
lines changed

docs/content.zh/docs/connectors/datastream/pulsar.md

+25-5
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
322322
```
323323
{{< /tab >}}
324324
{{< /tabs >}}
325+
325326
- 与前者不同的是,给定的消息可以跳过,再进行消费。
326327
{{< tabs "pulsar-starting-position-from-message-id-bool" >}}
327328
{{< tab "Java" >}}
@@ -335,7 +336,8 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
335336
```
336337
{{< /tab >}}
337338
{{< /tabs >}}
338-
- 从给定的消息时间开始消费。
339+
340+
- 从给定的消息发布时间开始消费,这个方法因为名称容易导致误解现在已经不建议使用。你可以使用方法 `StartCursor.fromPublishTime(long)`
339341
{{< tabs "pulsar-starting-position-message-time" >}}
340342
{{< tab "Java" >}}
341343
```java
@@ -349,6 +351,11 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
349351
{{< /tab >}}
350352
{{< /tabs >}}
351353

354+
- 从给定的消息发布时间开始消费。
355+
```java
356+
StartCursor.fromPublishTime(long);
357+
```
358+
352359
{{< hint info >}}
353360
每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 Pulsar 底层存储上查找到具体的消息。
354361

@@ -404,6 +411,7 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
404411
```
405412
{{< /tab >}}
406413
{{< /tabs >}}
414+
407415
- 停止于某条消息之后,结果里包含此消息。
408416
{{< tabs "pulsar-boundedness-after-message-id" >}}
409417
{{< tab "Java" >}}
@@ -417,7 +425,18 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
417425
```
418426
{{< /tab >}}
419427
{{< /tabs >}}
420-
- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`
428+
429+
- 停止于某个给定的消息事件时间戳,比如 `Message<byte[]>.getEventTime()`,消费结果里不包含此时间戳的消息。
430+
```java
431+
StopCursor.atEventTime(long);
432+
```
433+
434+
- 停止于某个给定的消息事件时间戳,比如 `Message<byte[]>.getEventTime()`,消费结果里包含此时间戳的消息。
435+
```java
436+
StopCursor.afterEventTime(long);
437+
```
438+
439+
- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`,消费结果里不包含此时间戳的消息。
421440
{{< tabs "pulsar-boundedness-publish-time" >}}
422441
{{< tab "Java" >}}
423442
```java
@@ -431,9 +450,10 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
431450
{{< /tab >}}
432451
{{< /tabs >}}
433452

434-
{{< hint warning >}}
435-
StopCursor.atEventTime(long) 目前已经处于弃用状态。
436-
{{< /hint >}}
453+
- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`,消费结果里包含此时间戳的消息。
454+
```java
455+
StopCursor.afterPublishTime(long);
456+
```
437457

438458
### Source 配置项
439459

docs/content/docs/connectors/datastream/pulsar.md

+31-6
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ The Pulsar connector consumes from the latest available message if the message I
356356
```
357357
{{< /tab >}}
358358
{{< /tabs >}}
359+
359360
- Start from a specified message between the earliest and the latest.
360361
The Pulsar connector consumes from the latest available message if the message ID doesn't exist.
361362

@@ -373,7 +374,10 @@ The Pulsar connector consumes from the latest available message if the message I
373374
{{< /tab >}}
374375
{{< /tabs >}}
375376

376-
- Start from the specified message time by `Message<byte[]>.getPublishTime()`.
377+
- Start from the specified message publish time by `Message<byte[]>.getPublishTime()`.
378+
This method is deprecated because the name is totally wrong which may cause confuse.
379+
You can use `StartCursor.fromPublishTime(long)` instead.
380+
377381
{{< tabs "pulsar-starting-position-message-time" >}}
378382
{{< tab "Java" >}}
379383
```java
@@ -387,6 +391,11 @@ The Pulsar connector consumes from the latest available message if the message I
387391
{{< /tab >}}
388392
{{< /tabs >}}
389393

394+
- Start from the specified message publish time by `Message<byte[]>.getPublishTime()`.
395+
```java
396+
StartCursor.fromPublishTime(long);
397+
```
398+
390399
{{< hint info >}}
391400
Each Pulsar message belongs to an ordered sequence on its topic.
392401
The sequence ID (`MessageId`) of the message is ordered in that sequence.
@@ -420,7 +429,7 @@ Built-in stop cursors include:
420429
{{< /tab >}}
421430
{{< /tabs >}}
422431

423-
- Stop at the latest available message when the Pulsar source starts consuming messages.
432+
- Stop at the latest available message when the Pulsar source starts consuming messages.
424433
{{< tabs "pulsar-boundedness-latest" >}}
425434
{{< tab "Java" >}}
426435
```java
@@ -447,6 +456,7 @@ Built-in stop cursors include:
447456
```
448457
{{< /tab >}}
449458
{{< /tabs >}}
459+
450460
- Stop but include the given message in the consuming result.
451461
{{< tabs "pulsar-boundedness-after-message-id" >}}
452462
{{< tab "Java" >}}
@@ -461,7 +471,20 @@ Built-in stop cursors include:
461471
{{< /tab >}}
462472
{{< /tabs >}}
463473

464-
- Stop at the specified message time by `Message<byte[]>.getPublishTime()`.
474+
- Stop at the specified event time by `Message<byte[]>.getEventTime()`. The message with the
475+
given event time won't be included in the consuming result.
476+
```java
477+
StopCursor.atEventTime(long);
478+
```
479+
480+
- Stop after the specified event time by `Message<byte[]>.getEventTime()`. The message with the
481+
given event time will be included in the consuming result.
482+
```java
483+
StopCursor.afterEventTime(long);
484+
```
485+
486+
- Stop at the specified publish time by `Message<byte[]>.getPublishTime()`. The message with the
487+
given publish time won't be included in the consuming result.
465488
{{< tabs "pulsar-boundedness-publish-time" >}}
466489
{{< tab "Java" >}}
467490
```java
@@ -475,9 +498,11 @@ Built-in stop cursors include:
475498
{{< /tab >}}
476499
{{< /tabs >}}
477500

478-
{{< hint warning >}}
479-
StopCursor.atEventTime(long) is now deprecated.
480-
{{< /hint >}}
501+
- Stop after the specified publish time by `Message<byte[]>.getPublishTime()`. The message with the
502+
given publish time will be included in the consuming result.
503+
```java
504+
StopCursor.afterPublishTime(long);
505+
```
481506

482507
### Source Configurable Options
483508

docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html

-6
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,6 @@
140140
<td>Boolean</td>
141141
<td>If enabled, the consumer will automatically retry messages.</td>
142142
</tr>
143-
<tr>
144-
<td><h5>pulsar.consumer.subscriptionInitialPosition</h5></td>
145-
<td style="word-wrap: break-word;">Latest</td>
146-
<td><p>Enum</p></td>
147-
<td>Initial position at which to set cursor when subscribing to a topic at first time.<br /><br />Possible values:<ul><li>"Latest"</li><li>"Earliest"</li></ul></td>
148-
</tr>
149143
<tr>
150144
<td><h5>pulsar.consumer.subscriptionMode</h5></td>
151145
<td style="word-wrap: break-word;">Durable</td>

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
3333
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
3434
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
35-
import org.apache.flink.connector.pulsar.source.enumerator.SplitsAssignmentState;
35+
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
36+
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory;
3637
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
3738
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
3839
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
@@ -142,31 +143,30 @@ public SourceReader<OUT, PulsarPartitionSplit> createReader(SourceReaderContext
142143
@Override
143144
public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> createEnumerator(
144145
SplitEnumeratorContext<PulsarPartitionSplit> enumContext) {
145-
SplitsAssignmentState assignmentState =
146-
new SplitsAssignmentState(stopCursor, sourceConfiguration);
146+
SplitAssigner splitAssigner = SplitAssignerFactory.create(stopCursor, sourceConfiguration);
147147
return new PulsarSourceEnumerator(
148148
subscriber,
149149
startCursor,
150150
rangeGenerator,
151151
sourceConfiguration,
152152
enumContext,
153-
assignmentState);
153+
splitAssigner);
154154
}
155155

156156
@Internal
157157
@Override
158158
public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> restoreEnumerator(
159159
SplitEnumeratorContext<PulsarPartitionSplit> enumContext,
160160
PulsarSourceEnumState checkpoint) {
161-
SplitsAssignmentState assignmentState =
162-
new SplitsAssignmentState(stopCursor, sourceConfiguration, checkpoint);
161+
SplitAssigner splitAssigner =
162+
SplitAssignerFactory.create(stopCursor, sourceConfiguration, checkpoint);
163163
return new PulsarSourceEnumerator(
164164
subscriber,
165165
startCursor,
166166
rangeGenerator,
167167
sourceConfiguration,
168168
enumContext,
169-
assignmentState);
169+
splitAssigner);
170170
}
171171

172172
@Internal

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797
* <p>To stop the connector user has to disable the auto partition discovery. As auto partition
9898
* discovery always expected new splits to come and not exiting. To disable auto partition
9999
* discovery, use builder.setConfig({@link
100-
* PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
100+
* PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
101101
*
102102
* <pre>{@code
103103
* PulsarSource<String> source = PulsarSource
@@ -266,7 +266,7 @@ public PulsarSourceBuilder<OUT> setTopicPattern(
266266
}
267267

268268
/**
269-
* The consumer name is informative and it can be used to identify a particular consumer
269+
* The consumer name is informative, and it can be used to identify a particular consumer
270270
* instance from the topic stats.
271271
*/
272272
public PulsarSourceBuilder<OUT> setConsumerName(String consumerName) {
@@ -321,7 +321,7 @@ public PulsarSourceBuilder<OUT> setStartCursor(StartCursor startCursor) {
321321
* <p>To stop the connector user has to disable the auto partition discovery. As auto partition
322322
* discovery always expected new splits to come and not exiting. To disable auto partition
323323
* discovery, use builder.setConfig({@link
324-
* PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
324+
* PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
325325
*
326326
* @param stopCursor The {@link StopCursor} to specify the stopping offset.
327327
* @return this PulsarSourceBuilder.
@@ -334,7 +334,7 @@ public PulsarSourceBuilder<OUT> setUnboundedStopCursor(StopCursor stopCursor) {
334334
}
335335

336336
/**
337-
* By default the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
337+
* By default, the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
338338
* and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in
339339
* {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link StopCursor}
340340
* to specify the stopping offsets for each partition. When all the partitions have reached

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java

+7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.configuration.description.Description;
2727
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
2828
import org.apache.flink.connector.pulsar.source.config.CursorVerification;
29+
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
2930

3031
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
3132
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -503,6 +504,12 @@ private PulsarSourceOptions() {
503504
code("PulsarClientException"))
504505
.build());
505506

507+
/**
508+
* @deprecated This option would be reset by {@link StartCursor}, no need to use it anymore.
509+
* Pulsar didn't support this config option before 1.10.1, so we have to remove this config
510+
* option.
511+
*/
512+
@Deprecated
506513
public static final ConfigOption<SubscriptionInitialPosition>
507514
PULSAR_SUBSCRIPTION_INITIAL_POSITION =
508515
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionInitialPosition")

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java

-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REPLICATE_SUBSCRIPTION_STATE;
6060
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_ENABLE;
6161
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_LETTER_TOPIC;
62-
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_INITIAL_POSITION;
6362
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_MODE;
6463
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
6564
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
@@ -113,8 +112,6 @@ public static <T> ConsumerBuilder<T> createConsumerBuilder(
113112
builder::consumerName);
114113
configuration.useOption(PULSAR_READ_COMPACTED, builder::readCompacted);
115114
configuration.useOption(PULSAR_PRIORITY_LEVEL, builder::priorityLevel);
116-
configuration.useOption(
117-
PULSAR_SUBSCRIPTION_INITIAL_POSITION, builder::subscriptionInitialPosition);
118115
createDeadLetterPolicy(configuration).ifPresent(builder::deadLetterPolicy);
119116
configuration.useOption(
120117
PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS,

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java

+4
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ public int getMessageQueueCapacity() {
8383
return messageQueueCapacity;
8484
}
8585

86+
/**
87+
* We would override the interval into a negative number when we set the connector with bounded
88+
* stop cursor.
89+
*/
8690
public boolean isEnablePartitionDiscovery() {
8791
return getPartitionDiscoveryIntervalMs() > 0;
8892
}

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,18 @@
1818

1919
package org.apache.flink.connector.pulsar.source.enumerator;
2020

21+
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
2122
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
2223
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
2324

25+
import java.util.HashMap;
26+
import java.util.HashSet;
2427
import java.util.Map;
2528
import java.util.Set;
2629

2730
/**
2831
* The state class for pulsar source enumerator, used for storing the split state. This class is
29-
* managed and controlled by {@link SplitsAssignmentState}.
32+
* managed and controlled by {@link SplitAssigner}.
3033
*/
3134
public class PulsarSourceEnumState {
3235

@@ -46,11 +49,12 @@ public class PulsarSourceEnumState {
4649
private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
4750

4851
/**
49-
* A {@link PulsarPartitionSplit} should be assigned for all flink readers. Using this map for
50-
* recording assign status.
52+
* It is used for Shared subscription. Every {@link PulsarPartitionSplit} should be assigned for
53+
* all flink readers. Using this map for recording assign status.
5154
*/
5255
private final Map<Integer, Set<String>> readerAssignedSplits;
5356

57+
/** The pipeline has been triggered and topic partitions have been assigned to readers. */
5458
private final boolean initialized;
5559

5660
public PulsarSourceEnumState(
@@ -85,4 +89,10 @@ public Map<Integer, Set<String>> getReaderAssignedSplits() {
8589
public boolean isInitialized() {
8690
return initialized;
8791
}
92+
93+
/** The initial assignment state for Pulsar. */
94+
public static PulsarSourceEnumState initialState() {
95+
return new PulsarSourceEnumState(
96+
new HashSet<>(), new HashSet<>(), new HashMap<>(), new HashMap<>(), false);
97+
}
8898
}

0 commit comments

Comments
 (0)