Skip to content

Commit 938be57

Browse files
committed
[FLINK-27399][Connector/Pulsar] Drop Consumer.seek() for apache/pulsar#16171
1 parent 3f539e4 commit 938be57

File tree

5 files changed

+66
-81
lines changed

5 files changed

+66
-81
lines changed

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
2727

2828
import org.apache.pulsar.client.admin.PulsarAdmin;
29-
import org.apache.pulsar.client.api.Consumer;
29+
import org.apache.pulsar.client.admin.PulsarAdminException;
3030
import org.apache.pulsar.client.api.Message;
3131
import org.apache.pulsar.client.api.MessageId;
3232
import org.apache.pulsar.client.api.PulsarClient;
@@ -75,12 +75,12 @@ protected void finishedPollMessage(Message<byte[]> message) {
7575
}
7676

7777
@Override
78-
protected void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
78+
protected void beforeCreatingConsumer(PulsarPartitionSplit split) {
7979
MessageId latestConsumedId = split.getLatestConsumedId();
8080

8181
// Reset the start position for ordered pulsar consumer.
8282
if (latestConsumedId != null) {
83-
LOG.debug("Start seeking from the checkpoint {}", latestConsumedId);
83+
LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
8484
try {
8585
MessageId initialPosition;
8686
if (latestConsumedId == MessageId.latest
@@ -91,8 +91,15 @@ protected void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consum
9191
initialPosition = nextMessageId(latestConsumedId);
9292
}
9393

94-
consumer.seek(initialPosition);
95-
} catch (PulsarClientException e) {
94+
// Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0
95+
// See https://github.com/apache/pulsar/issues/16757 for more details.
96+
pulsarAdmin
97+
.topics()
98+
.resetCursor(
99+
split.getPartition().getFullTopicName(),
100+
sourceConfiguration.getSubscriptionName(),
101+
initialPosition);
102+
} catch (PulsarAdminException e) {
96103
if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) {
97104
throw new IllegalArgumentException(e);
98105
} else {

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java

+18-20
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import java.util.List;
5454
import java.util.concurrent.ExecutionException;
5555
import java.util.concurrent.TimeoutException;
56-
import java.util.concurrent.atomic.AtomicBoolean;
5756

5857
import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
5958
import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
@@ -71,7 +70,6 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
7170
protected final PulsarAdmin pulsarAdmin;
7271
protected final SourceConfiguration sourceConfiguration;
7372
protected final PulsarDeserializationSchema<OUT> deserializationSchema;
74-
protected final AtomicBoolean wakeup;
7573

7674
protected Consumer<byte[]> pulsarConsumer;
7775
protected PulsarPartitionSplit registeredSplit;
@@ -85,7 +83,6 @@ protected PulsarPartitionSplitReaderBase(
8583
this.pulsarAdmin = pulsarAdmin;
8684
this.sourceConfiguration = sourceConfiguration;
8785
this.deserializationSchema = deserializationSchema;
88-
this.wakeup = new AtomicBoolean(false);
8986
}
9087

9188
@Override
@@ -97,19 +94,14 @@ public RecordsWithSplitIds<PulsarMessage<OUT>> fetch() throws IOException {
9794
return builder.build();
9895
}
9996

100-
// Set wakeup to false for start consuming.
101-
wakeup.compareAndSet(true, false);
102-
10397
StopCursor stopCursor = registeredSplit.getStopCursor();
10498
String splitId = registeredSplit.splitId();
10599
PulsarMessageCollector<OUT> collector = new PulsarMessageCollector<>(splitId, builder);
106100
Deadline deadline = Deadline.fromNow(sourceConfiguration.getMaxFetchTime());
107101

108102
// Consume message from pulsar until it was woke up by flink reader.
109103
for (int messageNum = 0;
110-
messageNum < sourceConfiguration.getMaxFetchRecords()
111-
&& deadline.hasTimeLeft()
112-
&& isNotWakeup();
104+
messageNum < sourceConfiguration.getMaxFetchRecords() && deadline.hasTimeLeft();
113105
messageNum++) {
114106
try {
115107
Duration timeout = deadline.timeLeftIfAny();
@@ -170,23 +162,27 @@ public void handleSplitsChanges(SplitsChange<PulsarPartitionSplit> splitsChanges
170162
newSplits.size() == 1, "This pulsar split reader only support one split.");
171163
PulsarPartitionSplit newSplit = newSplits.get(0);
172164

165+
// Open stop cursor.
166+
newSplit.open(pulsarAdmin);
167+
168+
// Before creating the consumer.
169+
beforeCreatingConsumer(newSplit);
170+
173171
// Create pulsar consumer.
174172
Consumer<byte[]> consumer = createPulsarConsumer(newSplit);
175173

176-
// Open start & stop cursor.
177-
newSplit.open(pulsarAdmin);
178-
179-
// Start Consumer.
180-
startConsumer(newSplit, consumer);
174+
// After creating the consumer.
175+
afterCreatingConsumer(newSplit, consumer);
181176

182177
LOG.info("Register split {} consumer for current reader.", newSplit);
178+
183179
this.registeredSplit = newSplit;
184180
this.pulsarConsumer = consumer;
185181
}
186182

187183
@Override
188184
public void wakeUp() {
189-
wakeup.compareAndSet(false, true);
185+
// Nothing to do on this method.
190186
}
191187

192188
@Override
@@ -202,14 +198,16 @@ protected abstract Message<byte[]> pollMessage(Duration timeout)
202198

203199
protected abstract void finishedPollMessage(Message<byte[]> message);
204200

205-
protected abstract void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer);
206-
207-
// --------------------------- Helper Methods -----------------------------
201+
protected void beforeCreatingConsumer(PulsarPartitionSplit split) {
202+
// Nothing to do by default.
203+
}
208204

209-
protected boolean isNotWakeup() {
210-
return !wakeup.get();
205+
protected void afterCreatingConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
206+
// Nothing to do by default.
211207
}
212208

209+
// --------------------------- Helper Methods -----------------------------
210+
213211
/** Create a specified {@link Consumer} by the given split information. */
214212
protected Consumer<byte[]> createPulsarConsumer(PulsarPartitionSplit split) {
215213
return createPulsarConsumer(split.getPartition());

flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ protected void finishedPollMessage(Message<byte[]> message) {
126126
}
127127

128128
@Override
129-
protected void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
129+
protected void afterCreatingConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
130130
TxnID uncommittedTransactionId = split.getUncommittedTransactionId();
131131

132132
// Abort the uncommitted pulsar transaction.

flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import java.net.URL;
2525
import java.util.ArrayList;
2626
import java.util.List;
27+
import java.util.Random;
28+
29+
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
2730

2831
/** Common test context for pulsar based test. */
2932
public abstract class PulsarTestContext<T> implements DataStreamSourceExternalContext<T> {
@@ -39,10 +42,13 @@ protected PulsarTestContext(PulsarTestEnvironment environment, List<URL> connect
3942
// Helper methods for generating data.
4043

4144
protected List<String> generateStringTestData(int splitIndex, long seed) {
42-
int recordNum = 300;
45+
Random random = new Random(seed);
46+
int recordNum = 300 + random.nextInt(200);
4347
List<String> records = new ArrayList<>(recordNum);
48+
4449
for (int i = 0; i < recordNum; i++) {
45-
records.add(splitIndex + "-" + i);
50+
int length = random.nextInt(40) + 10;
51+
records.add(splitIndex + "-" + i + "-" + randomAlphanumeric(length));
4652
}
4753

4854
return records;

flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java

+27-53
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,14 @@
3131
import org.apache.pulsar.client.admin.PulsarAdminException;
3232
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
3333
import org.apache.pulsar.client.api.Consumer;
34+
import org.apache.pulsar.client.api.ConsumerBuilder;
3435
import org.apache.pulsar.client.api.Message;
3536
import org.apache.pulsar.client.api.MessageId;
3637
import org.apache.pulsar.client.api.Producer;
38+
import org.apache.pulsar.client.api.ProducerBuilder;
3739
import org.apache.pulsar.client.api.PulsarClient;
3840
import org.apache.pulsar.client.api.PulsarClientException;
3941
import org.apache.pulsar.client.api.Schema;
40-
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
4142
import org.apache.pulsar.client.api.TypedMessageBuilder;
4243
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
4344
import org.apache.pulsar.client.api.transaction.TxnID;
@@ -49,10 +50,8 @@
4950
import java.io.IOException;
5051
import java.time.Duration;
5152
import java.util.ArrayList;
52-
import java.util.Arrays;
5353
import java.util.Collection;
5454
import java.util.List;
55-
import java.util.Map;
5655
import java.util.Random;
5756
import java.util.concurrent.ConcurrentHashMap;
5857
import java.util.concurrent.ExecutionException;
@@ -62,9 +61,7 @@
6261
import static java.util.Collections.emptyList;
6362
import static java.util.Collections.singletonList;
6463
import static java.util.concurrent.TimeUnit.MILLISECONDS;
65-
import static java.util.function.Function.identity;
6664
import static java.util.stream.Collectors.toList;
67-
import static java.util.stream.Collectors.toMap;
6865
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
6966
import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
7067
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
@@ -79,8 +76,10 @@
7976
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
8077
import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
8178
import static org.apache.flink.util.Preconditions.checkArgument;
79+
import static org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest;
8280
import static org.apache.pulsar.client.api.SubscriptionMode.Durable;
8381
import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
82+
import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
8483

8584
/**
8685
* A pulsar cluster operator used for operating pulsar instance. It's serializable for using in
@@ -178,7 +177,7 @@ public <T> void setupTopic(
178177
*/
179178
public void createTopic(String topic, int numberOfPartitions) {
180179
checkArgument(numberOfPartitions >= 0);
181-
if (numberOfPartitions <= 0) {
180+
if (numberOfPartitions == 0) {
182181
createNonPartitionedTopic(topic);
183182
} else {
184183
createPartitionedTopic(topic, numberOfPartitions);
@@ -196,7 +195,7 @@ public void increaseTopicPartitions(String topic, int newPartitionsNum) {
196195
sneakyAdmin(() -> admin().topics().getPartitionedTopicMetadata(topic));
197196
checkArgument(
198197
metadata.partitions < newPartitionsNum,
199-
"The new partition size which should exceed previous size.");
198+
"The new partition size which should greater than previous size.");
200199

201200
sneakyAdmin(() -> admin().topics().updatePartitionedTopic(topic, newPartitionsNum));
202201
}
@@ -220,9 +219,11 @@ public void deleteTopic(String topic) {
220219
return;
221220
}
222221

222+
// Close all the available consumers and producers.
223223
removeConsumers(topic);
224224
removeProducers(topic);
225-
if (metadata.partitions <= 0) {
225+
226+
if (metadata.partitions == NON_PARTITIONED) {
226227
sneakyAdmin(() -> admin().topics().delete(topicName));
227228
} else {
228229
sneakyAdmin(() -> admin().topics().deletePartitionedTopic(topicName));
@@ -245,22 +246,6 @@ public List<TopicPartition> topicInfo(String topic) {
245246
}
246247
}
247248

248-
/**
249-
* Query a list of topics. Convert the topic metadata into a list of topic partitions. Return a
250-
* mapping for topic and its partitions.
251-
*/
252-
public Map<String, List<TopicPartition>> topicsInfo(String... topics) {
253-
return topicsInfo(Arrays.asList(topics));
254-
}
255-
256-
/**
257-
* Query a list of topics. Convert the topic metadata into a list of topic partitions. Return a
258-
* mapping for topic and its partitions.
259-
*/
260-
public Map<String, List<TopicPartition>> topicsInfo(Collection<String> topics) {
261-
return topics.stream().collect(toMap(identity(), this::topicInfo));
262-
}
263-
264249
/**
265250
* Send a single message to Pulsar, return the message id after the ack from Pulsar.
266251
*
@@ -518,12 +503,13 @@ private <T> Producer<T> createProducer(String topic, Schema<T> schema)
518503
topicProducers.computeIfAbsent(
519504
index,
520505
i -> {
521-
try {
522-
return client().newProducer(schema).topic(topic).create();
523-
} catch (PulsarClientException e) {
524-
sneakyThrow(e);
525-
return null;
526-
}
506+
ProducerBuilder<T> builder =
507+
client().newProducer(schema)
508+
.topic(topic)
509+
.enableBatching(false)
510+
.enableMultiSchema(true);
511+
512+
return sneakyClient(builder::create);
527513
});
528514
}
529515

@@ -540,19 +526,15 @@ private <T> Consumer<T> createConsumer(String topic, Schema<T> schema)
540526
topicConsumers.computeIfAbsent(
541527
index,
542528
i -> {
543-
try {
544-
return client().newConsumer(schema)
545-
.topic(topic)
546-
.subscriptionName(SUBSCRIPTION_NAME)
547-
.subscriptionMode(Durable)
548-
.subscriptionType(Exclusive)
549-
.subscriptionInitialPosition(
550-
SubscriptionInitialPosition.Earliest)
551-
.subscribe();
552-
} catch (PulsarClientException e) {
553-
sneakyThrow(e);
554-
return null;
555-
}
529+
ConsumerBuilder<T> builder =
530+
client().newConsumer(schema)
531+
.topic(topic)
532+
.subscriptionName(SUBSCRIPTION_NAME)
533+
.subscriptionMode(Durable)
534+
.subscriptionType(Exclusive)
535+
.subscriptionInitialPosition(Earliest);
536+
537+
return sneakyClient(builder::subscribe);
556538
});
557539
}
558540

@@ -561,11 +543,7 @@ private void removeProducers(String topic) {
561543
ConcurrentHashMap<Integer, Producer<?>> integerProducers = producers.remove(topicName);
562544
if (integerProducers != null) {
563545
for (Producer<?> producer : integerProducers.values()) {
564-
try {
565-
producer.close();
566-
} catch (PulsarClientException e) {
567-
sneakyThrow(e);
568-
}
546+
sneakyClient(producer::close);
569547
}
570548
}
571549
}
@@ -575,11 +553,7 @@ private void removeConsumers(String topic) {
575553
ConcurrentHashMap<Integer, Consumer<?>> integerConsumers = consumers.remove(topicName);
576554
if (integerConsumers != null) {
577555
for (Consumer<?> consumer : integerConsumers.values()) {
578-
try {
579-
consumer.close();
580-
} catch (PulsarClientException e) {
581-
sneakyThrow(e);
582-
}
556+
sneakyClient(consumer::close);
583557
}
584558
}
585559
}

0 commit comments

Comments
 (0)