Skip to content

Commit

Permalink
add offset discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
EMsnap committed May 8, 2023
1 parent 450742e commit e8dd8a1
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,16 @@ public Map<TopicRange, MessageId> offsetForEachTopic(
}
}
return specificOffsets;
case EXTERNAL_SUBSCRIPTION:
Map<TopicRange, MessageId> offsetsFromSubs = new HashMap<>();
for (TopicRange topic : topics) {
offsetsFromSubs.put(
topic,
metadataReader.getPositionFromSubscription(
topic, subscriptionPosition));
}
log.info("offset for each topic: {}", offsetsFromSubs);
return offsetsFromSubs;
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.inlong.sort.pulsar.internal;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
Expand All @@ -33,9 +36,13 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -260,6 +267,44 @@ public void commitOffsetToCursor(Map<TopicRange, MessageId> offset) {
}
}

public MessageId getPositionFromSubscription(TopicRange topic, MessageId defaultPosition) {
try {
String subscriptionName = subscriptionNameFrom(topic);
TopicStats topicStats = admin.topics().getStats(topic.getTopic());
if (topicStats.getSubscriptions().containsKey(subscriptionName)) {
SubscriptionStats subStats = topicStats.getSubscriptions().get(subscriptionName);
if (subStats.getConsumers().size() != 0) {
throw new IllegalStateException(
"Subscription been actively used by other consumers, "
+ "in this situation, the exactly-once semantics cannot be guaranteed.");
} else {
String encodedSubName =
URLEncoder.encode(subscriptionName, StandardCharsets.UTF_8.toString());
PersistentTopicInternalStats.CursorStats c =
admin.topics()
.getInternalStats(topic.getTopic()).cursors
.get(encodedSubName);
String[] ids = c.markDeletePosition.split(":", 2);
long ledgerId = Long.parseLong(ids[0]);
long entryIdInMarkDelete = Long.parseLong(ids[1]);
// we are getting the next mid from sub position, if the entryId is -1,
// it denotes we haven't read data from the ledger before,
// therefore no need to skip the current entry for the next position
long entryId = entryIdInMarkDelete == -1 ? -1 : entryIdInMarkDelete + 1;
int partitionIdx = TopicName.getPartitionIndex(topic.getTopic());
return new MessageIdImpl(ledgerId, entryId, partitionIdx);
}
} else {
// create sub on topic
admin.topics()
.createSubscription(topic.getTopic(), subscriptionName, defaultPosition);
return defaultPosition;
}
} catch (PulsarAdminException | UnsupportedEncodingException e) {
throw new IllegalStateException("Failed to get stats for topic " + topic, e);
}
}

/**
* Designate the close of the metadata reader.
*/
Expand Down

0 comments on commit e8dd8a1

Please sign in to comment.