Skip to content

Commit 9baa6e0

Browse files
committed
[FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. (apache#19972)
* Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171 (cherry picked from commit 18d21a0)
1 parent 3d910dc commit 9baa6e0

File tree

43 files changed

+1260
-707
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1260
-707
lines changed

docs/content.zh/docs/connectors/datastream/pulsar.md

+41-12
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ Pulsar Source 提供了两种订阅 Topic 或 Topic 分区的方式。
7676
```java
7777
PulsarSource.builder().setTopics("some-topic1", "some-topic2");
7878

79-
// 从 topic "topic-a" 的 0 和 1 分区上消费
79+
// 从 topic "topic-a" 的 0 和 2 分区上消费
8080
PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2");
8181
```
8282

@@ -204,10 +204,14 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
204204
```java
205205
StartCursor.fromMessageId(MessageId, boolean);
206206
```
207-
- 从给定的消息时间开始消费
207+
- 从给定的消息发布时间开始消费,这个方法因为名称容易导致误解现在已经不建议使用。你可以使用方法 `StartCursor.fromPublishTime(long)`
208208
```java
209209
StartCursor.fromMessageTime(long);
210210
```
211+
- 从给定的消息发布时间开始消费。
212+
```java
213+
StartCursor.fromPublishTime(long);
214+
```
211215

212216
{{< hint info >}}
213217
每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 Pulsar 底层存储上查找到具体的消息。
@@ -239,14 +243,22 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
239243
```java
240244
StopCursor.afterMessageId(MessageId);
241245
```
242-
- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`
246+
- 停止于某个给定的消息事件时间戳,比如 `Message<byte[]>.getEventTime()`,消费结果里不包含此时间戳的消息。
247+
```java
248+
StopCursor.atEventTime(long);
249+
```
250+
- 停止于某个给定的消息事件时间戳,比如 `Message<byte[]>.getEventTime()`,消费结果里包含此时间戳的消息。
251+
```java
252+
StopCursor.afterEventTime(long);
253+
```
254+
- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`,消费结果里不包含此时间戳的消息。
243255
```java
244256
StopCursor.atPublishTime(long);
245257
```
246-
247-
{{< hint warning >}}
248-
StopCursor.atEventTime(long) 目前已经处于弃用状态。
249-
{{< /hint >}}
258+
- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`,消费结果里包含此时间戳的消息。
259+
```java
260+
StopCursor.afterPublishTime(long);
261+
```
250262

251263
### Source 配置项
252264

@@ -352,7 +364,7 @@ PulsarSink<String> sink = PulsarSink.builder()
352364
.setAdminUrl(adminUrl)
353365
.setTopics("topic1")
354366
.setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
355-
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
367+
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
356368
.build();
357369

358370
stream.sinkTo(sink);
@@ -375,10 +387,10 @@ stream.sinkTo(sink);
375387
// Topic "some-topic1" 和 "some-topic2"
376388
PulsarSink.builder().setTopics("some-topic1", "some-topic2")
377389

378-
// Topic "topic-a" 的分区 0 和 2
390+
// Topic "topic-a" 的分区 0 和 2
379391
PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
380392

381-
// Topic "topic-a" 以及 Topic "some-topic2" 分区 0 和 2
393+
// Topic "topic-a" 以及 Topic "some-topic2" 分区 0 和 2
382394
PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")
383395
```
384396

@@ -619,7 +631,11 @@ Pulsar Sink 使用生产者 API 来发送消息。Pulsar 的 `ProducerConfigurat
619631

620632
默认情况下,Pulsar 生产者每隔 60 秒才会刷新一次监控数据,然而 Pulsar Sink 每 500 毫秒就会从 Pulsar 生产者中获得最新的监控数据。因此 `numRecordsOut``numBytesOut``numAcksReceived` 以及 `numRecordsOutErrors` 4 个指标实际上每 60 秒才会刷新一次。
621633

622-
如果想要更高地刷新评率,可以通过 `builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS. 1L)` 来将 Pulsar 生产者的监控数据刷新频率调整至相应值(最低为1s)。
634+
如果想要更高地刷新评率,可以通过如下方式来将 Pulsar 生产者的监控数据刷新频率调整至相应值(最低为1s):
635+
636+
```java
637+
builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS, 1L);
638+
```
623639

624640
`numBytesOutRate``numRecordsOutRate` 指标是 Flink 内部通过 `numBytesOut``numRecordsOut` 计数器,在一个 60 秒的窗口内计算得到的。
625641

@@ -650,7 +666,20 @@ Pulsar Sink 遵循 [FLIP-191](https://cwiki.apache.org/confluence/display/FLINK/
650666

651667
用户遇到的问题可能与 Flink 无关,请先升级 Pulsar 的版本、Pulsar 客户端的版本,或者修改 Pulsar 的配置、Pulsar 连接器的配置来尝试解决问题。
652668

669+
## 已知问题
670+
671+
本节介绍有关 Pulsar 连接器的一些已知问题。
672+
653673
### 在 Java 11 上使用不稳定
654674

655-
Pulsar connector 在 Java 11 中有一些尚未修复的问题。我们当前推荐在 Java 8 环境中运行Pulsar connector.
675+
Pulsar connector 在 Java 11 中有一些尚未修复的问题。我们当前推荐在 Java 8 环境中运行Pulsar connector。
676+
677+
### 不自动重连,而是抛出TransactionCoordinatorNotFound异常
678+
679+
Pulsar 事务机制仍在积极发展中,当前版本并不稳定。 Pulsar 2.9.2
680+
引入了这个问题 [a break change](https://github.com/apache/pulsar/pull/13135)
681+
如果您使用 Pulsar 2.9.2或更高版本与较旧的 Pulsar 客户端一起使用,您可能会收到一个“TransactionCoordinatorNotFound”异常。
682+
683+
您可以使用最新的`pulsar-client-all`分支来解决这个问题。
684+
656685
{{< top >}}

docs/content/docs/connectors/datastream/pulsar.md

+44-26
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,16 @@ The Pulsar connector consumes from the latest available message if the message I
241241
```java
242242
StartCursor.fromMessageId(MessageId, boolean);
243243
```
244-
- Start from the specified message time by `Message<byte[]>.getPublishTime()`.
244+
- Start from the specified message publish time by `Message<byte[]>.getPublishTime()`.
245+
This method is deprecated because the name is totally wrong which may cause confuse.
246+
You can use `StartCursor.fromPublishTime(long)` instead.
245247
```java
246248
StartCursor.fromMessageTime(long);
247249
```
250+
- Start from the specified message publish time by `Message<byte[]>.getPublishTime()`.
251+
```java
252+
StartCursor.fromPublishTime(long);
253+
```
248254

249255
{{< hint info >}}
250256
Each Pulsar message belongs to an ordered sequence on its topic.
@@ -281,14 +287,26 @@ Built-in stop cursors include:
281287
```java
282288
StopCursor.afterMessageId(MessageId);
283289
```
284-
- Stop at the specified message time by `Message<byte[]>.getPublishTime()`.
290+
- Stop at the specified event time by `Message<byte[]>.getEventTime()`. The message with the
291+
given event time won't be included in the consuming result.
292+
```java
293+
StopCursor.atEventTime(long);
294+
```
295+
- Stop after the specified event time by `Message<byte[]>.getEventTime()`. The message with the
296+
given event time will be included in the consuming result.
297+
```java
298+
StopCursor.afterEventTime(long);
299+
```
300+
- Stop at the specified publish time by `Message<byte[]>.getPublishTime()`. The message with the
301+
given publish time won't be included in the consuming result.
285302
```java
286303
StopCursor.atPublishTime(long);
287304
```
288-
289-
{{< hint warning >}}
290-
StopCursor.atEventTime(long) is now deprecated.
291-
{{< /hint >}}
305+
- Stop after the specified publish time by `Message<byte[]>.getPublishTime()`. The message with the
306+
given publish time will be included in the consuming result.
307+
```java
308+
StopCursor.afterPublishTime(long);
309+
```
292310

293311
### Source Configurable Options
294312

@@ -431,9 +449,9 @@ PulsarSink<String> sink = PulsarSink.builder()
431449
.setAdminUrl(adminUrl)
432450
.setTopics("topic1")
433451
.setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
434-
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
452+
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
435453
.build();
436-
454+
437455
stream.sinkTo(sink);
438456
```
439457

@@ -742,7 +760,7 @@ are updated every 60 seconds. To increase the metrics refresh frequency, you can
742760
the Pulsar producer stats refresh interval to a smaller value (minimum 1 second), as shown below.
743761

744762
```java
745-
builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS. 1L)
763+
builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS, 1L);
746764
```
747765

748766
`numBytesOutRate` and `numRecordsOutRate` are calculated based on the `numBytesOut` and `numRecordsOUt`
@@ -768,23 +786,6 @@ you to reuse the same Flink job after certain "allowed" data model changes, like
768786
a field in a AVRO-based Pojo class. Please note that you can specify Pulsar schema validation rules
769787
and define an auto schema update. For details, refer to [Pulsar Schema Evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/).
770788

771-
## Known Issues
772-
773-
This section describes some known issues about the Pulsar connectors.
774-
775-
### Unstable on Java 11
776-
777-
Pulsar connector has some known issues on Java 11. It is recommended to run Pulsar connector
778-
on Java 8.
779-
780-
### No TransactionCoordinatorNotFound, but automatic reconnect
781-
782-
Pulsar transactions are still in active development and are not stable. Pulsar 2.9.2
783-
introduces [a break change](https://github.com/apache/pulsar/pull/13135) in transactions.
784-
If you use Pulsar 2.9.2 or higher with an older Pulsar client, you might get a `TransactionCoordinatorNotFound` exception.
785-
786-
You can use the latest `pulsar-client-all` release to resolve this issue.
787-
788789
## Upgrading to the Latest Connector Version
789790

790791
The generic upgrade steps are outlined in [upgrading jobs and Flink versions guide]({{< ref "docs/ops/upgrading" >}}).
@@ -802,4 +803,21 @@ If you have a problem with Pulsar when using Flink, keep in mind that Flink only
802803
and your problem might be independent of Flink and sometimes can be solved by upgrading Pulsar brokers,
803804
reconfiguring Pulsar brokers or reconfiguring Pulsar connector in Flink.
804805

806+
## Known Issues
807+
808+
This section describes some known issues about the Pulsar connectors.
809+
810+
### Unstable on Java 11
811+
812+
Pulsar connector has some known issues on Java 11. It is recommended to run Pulsar connector
813+
on Java 8.
814+
815+
### No TransactionCoordinatorNotFound, but automatic reconnect
816+
817+
Pulsar transactions are still in active development and are not stable. Pulsar 2.9.2
818+
introduces [a break change](https://github.com/apache/pulsar/pull/13135) in transactions.
819+
If you use Pulsar 2.9.2 or higher with an older Pulsar client, you might get a `TransactionCoordinatorNotFound` exception.
820+
821+
You can use the latest `pulsar-client-all` release to resolve this issue.
822+
805823
{{< top >}}

docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html

-6
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,6 @@
140140
<td>Boolean</td>
141141
<td>If enabled, the consumer will automatically retry messages.</td>
142142
</tr>
143-
<tr>
144-
<td><h5>pulsar.consumer.subscriptionInitialPosition</h5></td>
145-
<td style="word-wrap: break-word;">Latest</td>
146-
<td><p>Enum</p></td>
147-
<td>Initial position at which to set cursor when subscribing to a topic at first time.<br /><br />Possible values:<ul><li>"Latest"</li><li>"Earliest"</li></ul></td>
148-
</tr>
149143
<tr>
150144
<td><h5>pulsar.consumer.subscriptionMode</h5></td>
151145
<td style="word-wrap: break-word;">Durable</td>

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
3333
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
3434
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
35-
import org.apache.flink.connector.pulsar.source.enumerator.SplitsAssignmentState;
35+
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
36+
import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory;
3637
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
3738
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
3839
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
@@ -142,31 +143,30 @@ public SourceReader<OUT, PulsarPartitionSplit> createReader(SourceReaderContext
142143
@Override
143144
public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> createEnumerator(
144145
SplitEnumeratorContext<PulsarPartitionSplit> enumContext) {
145-
SplitsAssignmentState assignmentState =
146-
new SplitsAssignmentState(stopCursor, sourceConfiguration);
146+
SplitAssigner splitAssigner = SplitAssignerFactory.create(stopCursor, sourceConfiguration);
147147
return new PulsarSourceEnumerator(
148148
subscriber,
149149
startCursor,
150150
rangeGenerator,
151151
sourceConfiguration,
152152
enumContext,
153-
assignmentState);
153+
splitAssigner);
154154
}
155155

156156
@Internal
157157
@Override
158158
public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> restoreEnumerator(
159159
SplitEnumeratorContext<PulsarPartitionSplit> enumContext,
160160
PulsarSourceEnumState checkpoint) {
161-
SplitsAssignmentState assignmentState =
162-
new SplitsAssignmentState(stopCursor, sourceConfiguration, checkpoint);
161+
SplitAssigner splitAssigner =
162+
SplitAssignerFactory.create(stopCursor, sourceConfiguration, checkpoint);
163163
return new PulsarSourceEnumerator(
164164
subscriber,
165165
startCursor,
166166
rangeGenerator,
167167
sourceConfiguration,
168168
enumContext,
169-
assignmentState);
169+
splitAssigner);
170170
}
171171

172172
@Internal

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797
* <p>To stop the connector user has to disable the auto partition discovery. As auto partition
9898
* discovery always expected new splits to come and not exiting. To disable auto partition
9999
* discovery, use builder.setConfig({@link
100-
* PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
100+
* PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
101101
*
102102
* <pre>{@code
103103
* PulsarSource<String> source = PulsarSource
@@ -266,7 +266,7 @@ public PulsarSourceBuilder<OUT> setTopicPattern(
266266
}
267267

268268
/**
269-
* The consumer name is informative and it can be used to identify a particular consumer
269+
* The consumer name is informative, and it can be used to identify a particular consumer
270270
* instance from the topic stats.
271271
*/
272272
public PulsarSourceBuilder<OUT> setConsumerName(String consumerName) {
@@ -321,7 +321,7 @@ public PulsarSourceBuilder<OUT> setStartCursor(StartCursor startCursor) {
321321
* <p>To stop the connector user has to disable the auto partition discovery. As auto partition
322322
* discovery always expected new splits to come and not exiting. To disable auto partition
323323
* discovery, use builder.setConfig({@link
324-
* PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
324+
* PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
325325
*
326326
* @param stopCursor The {@link StopCursor} to specify the stopping offset.
327327
* @return this PulsarSourceBuilder.
@@ -334,7 +334,7 @@ public PulsarSourceBuilder<OUT> setUnboundedStopCursor(StopCursor stopCursor) {
334334
}
335335

336336
/**
337-
* By default the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
337+
* By default, the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
338338
* and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in
339339
* {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link StopCursor}
340340
* to specify the stopping offsets for each partition. When all the partitions have reached

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java

+7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.configuration.description.Description;
2727
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
2828
import org.apache.flink.connector.pulsar.source.config.CursorVerification;
29+
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
2930

3031
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
3132
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -503,6 +504,12 @@ private PulsarSourceOptions() {
503504
code("PulsarClientException"))
504505
.build());
505506

507+
/**
508+
* @deprecated This option would be reset by {@link StartCursor}, no need to use it anymore.
509+
* Pulsar didn't support this config option before 1.10.1, so we have to remove this config
510+
* option.
511+
*/
512+
@Deprecated
506513
public static final ConfigOption<SubscriptionInitialPosition>
507514
PULSAR_SUBSCRIPTION_INITIAL_POSITION =
508515
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionInitialPosition")

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java

-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REPLICATE_SUBSCRIPTION_STATE;
6060
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_ENABLE;
6161
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_LETTER_TOPIC;
62-
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_INITIAL_POSITION;
6362
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_MODE;
6463
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
6564
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
@@ -113,8 +112,6 @@ public static <T> ConsumerBuilder<T> createConsumerBuilder(
113112
builder::consumerName);
114113
configuration.useOption(PULSAR_READ_COMPACTED, builder::readCompacted);
115114
configuration.useOption(PULSAR_PRIORITY_LEVEL, builder::priorityLevel);
116-
configuration.useOption(
117-
PULSAR_SUBSCRIPTION_INITIAL_POSITION, builder::subscriptionInitialPosition);
118115
createDeadLetterPolicy(configuration).ifPresent(builder::deadLetterPolicy);
119116
configuration.useOption(
120117
PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS,

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java

+4
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ public int getMessageQueueCapacity() {
8383
return messageQueueCapacity;
8484
}
8585

86+
/**
87+
* We would override the interval into a negative number when we set the connector with bounded
88+
* stop cursor.
89+
*/
8690
public boolean isEnablePartitionDiscovery() {
8791
return getPartitionDiscoveryIntervalMs() > 0;
8892
}

0 commit comments

Comments
 (0)