From e8dd8a19dbc3d34f8d69c0f2b070709a3b6f4344 Mon Sep 17 00:00:00 2001 From: pengzirui Date: Sat, 6 May 2023 17:08:13 +0800 Subject: [PATCH] add offset discovery --- .../pulsar/internal/FlinkPulsarSource.java | 10 +++++ .../pulsar/internal/PulsarMetadataReader.java | 45 +++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java index 3ec787d0d37..9225abd4383 100644 --- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java +++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java @@ -1006,6 +1006,16 @@ public Map offsetForEachTopic( } } return specificOffsets; + case EXTERNAL_SUBSCRIPTION: + Map offsetsFromSubs = new HashMap<>(); + for (TopicRange topic : topics) { + offsetsFromSubs.put( + topic, + metadataReader.getPositionFromSubscription( + topic, subscriptionPosition)); + } + log.info("offset for each topic: {}", offsetsFromSubs); + return offsetsFromSubs; } return null; } diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java index 10b44b3708f..780e6803b28 100644 --- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java +++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/PulsarMetadataReader.java @@ -17,6 +17,9 @@ package org.apache.inlong.sort.pulsar.internal; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils; @@ -33,9 +36,13 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Range; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.shade.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -260,6 +267,44 @@ public void commitOffsetToCursor(Map offset) { } } + public MessageId getPositionFromSubscription(TopicRange topic, MessageId defaultPosition) { + try { + String subscriptionName = subscriptionNameFrom(topic); + TopicStats topicStats = admin.topics().getStats(topic.getTopic()); + if (topicStats.getSubscriptions().containsKey(subscriptionName)) { + SubscriptionStats subStats = topicStats.getSubscriptions().get(subscriptionName); + if (subStats.getConsumers().size() != 0) { + throw new IllegalStateException( + "Subscription been actively used by other consumers, " + + "in this situation, the exactly-once semantics cannot be guaranteed."); + } else { + String encodedSubName = + URLEncoder.encode(subscriptionName, StandardCharsets.UTF_8.toString()); + PersistentTopicInternalStats.CursorStats c = + admin.topics() + .getInternalStats(topic.getTopic()).cursors + .get(encodedSubName); + String[] ids = c.markDeletePosition.split(":", 2); + long ledgerId = Long.parseLong(ids[0]); + long entryIdInMarkDelete = Long.parseLong(ids[1]); + // we are getting the next mid from sub position, if the entryId is -1, + // it denotes we haven't read data from the ledger before, + // therefore no need to skip the current entry for the next position + long entryId = entryIdInMarkDelete == -1 ? -1 : entryIdInMarkDelete + 1; + int partitionIdx = TopicName.getPartitionIndex(topic.getTopic()); + return new MessageIdImpl(ledgerId, entryId, partitionIdx); + } + } else { + // create sub on topic + admin.topics() + .createSubscription(topic.getTopic(), subscriptionName, defaultPosition); + return defaultPosition; + } + } catch (PulsarAdminException | UnsupportedEncodingException e) { + throw new IllegalStateException("Failed to get stats for topic " + topic, e); + } + } + /** * Designate the close of the metadata reader. */