Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refact on maintain the progress of processing records #17

Merged
merged 1 commit into from
Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 10 additions & 136 deletions src/main/java/cn/leancloud/kafka/consumer/AbstractCommitPolicy.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package cn.leancloud.kafka.consumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;

import java.time.Duration;
import java.util.*;

import static java.util.stream.Collectors.toSet;
import static java.util.Collections.emptySet;

abstract class AbstractCommitPolicy<K, V> implements CommitPolicy<K, V> {
abstract class AbstractCommitPolicy<K, V> implements CommitPolicy {
static SleepFunction sleepFunction = Thread::sleep;

interface SleepFunction {
Expand Down Expand Up @@ -45,116 +44,35 @@ void onError(RetriableException e) {
}

protected final Consumer<K, V> consumer;
private final Map<TopicPartition, Long> topicOffsetHighWaterMark;
private final Map<TopicPartition, CompletedOffsets> completedOffsets;
private final long syncCommitRetryIntervalMs;
private final int maxAttemptsForEachSyncCommit;

AbstractCommitPolicy(Consumer<K, V> consumer, Duration syncCommitRetryInterval, int maxAttemptsForEachSyncCommit) {
this.consumer = consumer;
this.topicOffsetHighWaterMark = new HashMap<>();
this.completedOffsets = new HashMap<>();
this.syncCommitRetryIntervalMs = syncCommitRetryInterval.toMillis();
this.maxAttemptsForEachSyncCommit = maxAttemptsForEachSyncCommit;
}

@Override
public void markPendingRecord(ConsumerRecord<K, V> record) {
final TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
topicOffsetHighWaterMark.merge(
topicPartition,
record.offset() + 1,
Math::max);

final CompletedOffsets offset = completedOffsets.get(topicPartition);
// please note that if offset exists, it could happen for record.offset() >= offset.nextOffsetToCommit()
// when there're duplicate records which have lower offset than our next offset to commit consumed from broker
if (offset == null) {
completedOffsets.put(topicPartition, new CompletedOffsets(record.offset() - 1L));
}
}

@Override
public void markCompletedRecord(ConsumerRecord<K, V> record) {
final CompletedOffsets offset = completedOffsets.get(new TopicPartition(record.topic(), record.partition()));
// offset could be null, when the partition of the record was revoked before its processing was done
if (offset != null) {
offset.addCompleteOffset(record.offset());
}
}

@Override
public void revokePartitions(Collection<TopicPartition> partitions) {
clearProcessingRecordStatesFor(partitions);
}

@Override
public Set<TopicPartition> partialCommitSync() {
final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = completedTopicOffsetsToCommit();
public Set<TopicPartition> partialCommitSync(ProcessRecordsProgress progress) {
final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = progress.completedOffsetsToCommit();
if (offsetsToCommit.isEmpty()) {
return Collections.emptySet();
return emptySet();
}
commitSyncWithRetry(offsetsToCommit);
updatePartialCommittedOffsets(offsetsToCommit);
progress.updateCommittedOffsets(offsetsToCommit);

return clearProcessingRecordStatesForCompletedPartitions(offsetsToCommit);
return progress.clearCompletedPartitions(offsetsToCommit);
}

Set<TopicPartition> fullCommitSync() {
Set<TopicPartition> fullCommitSync(ProcessRecordsProgress progress) {
commitSyncWithRetry();

final Set<TopicPartition> completePartitions = partitionsForAllRecordsStates();
clearAllProcessingRecordStates();
final Set<TopicPartition> completePartitions = progress.allPartitions();
progress.clearAll();
return completePartitions;
}

@VisibleForTesting
Map<TopicPartition, Long> topicOffsetHighWaterMark() {
return topicOffsetHighWaterMark;
}

Map<TopicPartition, OffsetAndMetadata> completedTopicOffsetsToCommit() {
if (noCompletedOffsets()) {
return Collections.emptyMap();
}

final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (Map.Entry<TopicPartition, CompletedOffsets> entry : completedOffsets.entrySet()) {
final CompletedOffsets offset = entry.getValue();
if (offset.hasOffsetToCommit()) {
offsets.put(entry.getKey(), offset.getOffsetToCommit());
}
}

return offsets;
}

boolean noTopicOffsetsToCommit() {
if (noCompletedOffsets()) {
return true;
}

for (Map.Entry<TopicPartition, CompletedOffsets> entry : completedOffsets.entrySet()) {
final CompletedOffsets offset = entry.getValue();
if (offset.hasOffsetToCommit()) {
return false;
}
}

return true;
}

void updatePartialCommittedOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
final CompletedOffsets offset = completedOffsets.get(entry.getKey());
offset.updateCommittedOffset(entry.getValue().offset());
}
}

boolean noCompletedOffsets() {
return completedOffsets.isEmpty();
}

void commitSyncWithRetry() {
final RetryContext context = context();
do {
Expand All @@ -179,50 +97,6 @@ void commitSyncWithRetry(Map<TopicPartition, OffsetAndMetadata> offsets) {
} while (true);
}

Set<TopicPartition> partitionsForAllRecordsStates() {
return new HashSet<>(topicOffsetHighWaterMark.keySet());
}

void clearAllProcessingRecordStates() {
topicOffsetHighWaterMark.clear();
completedOffsets.clear();
}

Set<TopicPartition> clearProcessingRecordStatesForCompletedPartitions(Map<TopicPartition, OffsetAndMetadata> committedOffsets) {
final Set<TopicPartition> partitions = partitionsToSafeResume(committedOffsets);
clearProcessingRecordStatesFor(partitions);
return partitions;
}

void clearProcessingRecordStatesFor(Collection<TopicPartition> partitions) {
for (TopicPartition p : partitions) {
topicOffsetHighWaterMark.remove(p);
completedOffsets.remove(p);
}
}

Set<TopicPartition> partitionsToSafeResume() {
return partitionsToSafeResume(completedTopicOffsetsToCommit());
}

Set<TopicPartition> partitionsToSafeResume(Map<TopicPartition, OffsetAndMetadata> completedOffsets) {
return completedOffsets
.entrySet()
.stream()
.filter(entry -> topicOffsetMeetHighWaterMark(entry.getKey(), entry.getValue()))
.map(Map.Entry::getKey)
.collect(toSet());
}

private boolean topicOffsetMeetHighWaterMark(TopicPartition topicPartition, OffsetAndMetadata offset) {
final Long offsetHighWaterMark = topicOffsetHighWaterMark.get(topicPartition);
if (offsetHighWaterMark != null) {
return offset.offset() >= offsetHighWaterMark;
}
// maybe this partition revoked before a msg of this partition was processed
return true;
}

private RetryContext context() {
return new RetryContext(syncCommitRetryIntervalMs, maxAttemptsForEachSyncCommit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ abstract class AbstractRecommitAwareCommitPolicy<K, V> extends AbstractCommitPol
}

@Override
public final Set<TopicPartition> tryCommit(boolean noPendingRecords) {
public final Set<TopicPartition> tryCommit(boolean noPendingRecords, ProcessRecordsProgress progress) {
if (needRecommit()) {
commitSyncWithRetry(offsetsForRecommit());
updateNextRecommitTime();
}
return tryCommit0(noPendingRecords);
return tryCommit0(noPendingRecords, progress);
}

abstract Set<TopicPartition> tryCommit0(boolean noPendingRecords);
abstract Set<TopicPartition> tryCommit0(boolean noPendingRecords, ProcessRecordsProgress progress);

void updateNextRecommitTime() {
updateNextRecommitTime(System.nanoTime());
Expand Down
42 changes: 16 additions & 26 deletions src/main/java/cn/leancloud/kafka/consumer/AsyncCommitPolicy.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package cn.leancloud.kafka.consumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.emptySet;
Expand All @@ -17,7 +14,6 @@ final class AsyncCommitPolicy<K, V> extends AbstractRecommitAwareCommitPolicy<K,
private static final Logger logger = LoggerFactory.getLogger(AsyncCommitPolicy.class);

private final int maxPendingAsyncCommits;
private final OffsetCommitCallback callback;
private int pendingAsyncCommitCounter;
private boolean forceSync;

Expand All @@ -28,19 +24,18 @@ final class AsyncCommitPolicy<K, V> extends AbstractRecommitAwareCommitPolicy<K,
int maxPendingAsyncCommits) {
super(consumer, syncCommitRetryInterval, maxAttemptsForEachSyncCommit, recommitInterval);
this.maxPendingAsyncCommits = maxPendingAsyncCommits;
this.callback = new AsyncCommitCallback();
}

@Override
Set<TopicPartition> tryCommit0(boolean noPendingRecords) {
Set<TopicPartition> tryCommit0(boolean noPendingRecords, ProcessRecordsProgress progress) {
// with forceSync mark it means a previous async commit was failed, so
// we do a sync commit no matter if there's any pending records or completed offsets
if (!forceSync && (!noPendingRecords || noTopicOffsetsToCommit())) {
if (!forceSync && (!noPendingRecords || progress.noOffsetsToCommit())) {
return emptySet();
}

final Set<TopicPartition> partitions = partitionsForAllRecordsStates();
commit();
final Set<TopicPartition> partitions = progress.allPartitions();
commit(progress);

// for our commit policy, no matter syncCommit or asyncCommit we are using, we always
// commit all assigned offsets, so we can update recommit time here safely. And
Expand All @@ -60,29 +55,24 @@ boolean forceSync() {
return forceSync;
}

private void commit() {
private void commit(ProcessRecordsProgress progress) {
if (forceSync || pendingAsyncCommitCounter >= maxPendingAsyncCommits) {
commitSyncWithRetry();
pendingAsyncCommitCounter = 0;
forceSync = false;
clearAllProcessingRecordStates();
progress.clearAll();
} else {
++pendingAsyncCommitCounter;
consumer.commitAsync(callback);
}
}

private class AsyncCommitCallback implements OffsetCommitCallback {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
--pendingAsyncCommitCounter;
assert pendingAsyncCommitCounter >= 0 : "actual: " + pendingAsyncCommitCounter;
if (exception != null) {
logger.warn("Failed to commit offsets: " + offsets + " asynchronously", exception);
forceSync = true;
} else {
clearProcessingRecordStatesForCompletedPartitions(offsets);
}
consumer.commitAsync(((offsets, exception) -> {
--pendingAsyncCommitCounter;
assert pendingAsyncCommitCounter >= 0 : "actual: " + pendingAsyncCommitCounter;
if (exception != null) {
logger.warn("Failed to commit offsets: " + offsets + " asynchronously", exception);
forceSync = true;
} else {
progress.clearCompletedPartitions(offsets);
}
}));
}
}
}
12 changes: 6 additions & 6 deletions src/main/java/cn/leancloud/kafka/consumer/AutoCommitPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ final class AutoCommitPolicy<K, V> extends AbstractCommitPolicy<K, V> {
}

@Override
public Set<TopicPartition> tryCommit(boolean noPendingRecords) {
if (noTopicOffsetsToCommit()) {
public Set<TopicPartition> tryCommit(boolean noPendingRecords, ProcessRecordsProgress progress) {
if (progress.noOffsetsToCommit()) {
return emptySet();
}

final Set<TopicPartition> partitions;
if (noPendingRecords) {
partitions = partitionsForAllRecordsStates();
clearAllProcessingRecordStates();
partitions = progress.allPartitions();
progress.clearAll();
} else {
partitions = partitionsToSafeResume();
clearProcessingRecordStatesFor(partitions);
partitions = progress.completedPartitions();
progress.clearFor(partitions);
}

return partitions;
Expand Down
31 changes: 3 additions & 28 deletions src/main/java/cn/leancloud/kafka/consumer/CommitPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,7 @@
import java.util.Collection;
import java.util.Set;

interface CommitPolicy<K, V> {
/**
* Mark an {@link ConsumerRecord} as pending before processing it. So {@link CommitPolicy} can know which and
* how many records we need to process. It is called by {@link Fetcher} when {@code Fetcher} fetched any
* {@link ConsumerRecord}s from Broker.
*
* @param record the {@link ConsumerRecord} need to process
*/
void markPendingRecord(ConsumerRecord<K, V> record);

/**
* Mark an {@link ConsumerRecord} as completed after processing it. So {@link CommitPolicy} can know which and
* how many records we have processed. It is called by {@link Fetcher} when {@code Fetcher} make sure that
* a {@code ConsumerRecord} was processed successfully.
*
* @param record the {@link ConsumerRecord} processed
*/
void markCompletedRecord(ConsumerRecord<K, V> record);

interface CommitPolicy {
/**
* Try commit offset for any {@link TopicPartition}s which has processed {@link ConsumerRecord}s based on the
* intrinsic policy of this {@link CommitPolicy}. This method is called whenever there're any
Expand All @@ -35,7 +17,7 @@ interface CommitPolicy<K, V> {
* calculate this value much quicker
* @return those {@link TopicPartition}s which have no pending {@code ConsumerRecord}s
*/
Set<TopicPartition> tryCommit(boolean noPendingRecords);
Set<TopicPartition> tryCommit(boolean noPendingRecords, ProcessRecordsProgress progress);

/**
* Do a dedicated partition commit synchronously which only commit those {@link ConsumerRecord}s that have
Expand All @@ -44,12 +26,5 @@ interface CommitPolicy<K, V> {
*
* @return those {@link TopicPartition}s which have no pending {@code ConsumerRecord}s
*/
Set<TopicPartition> partialCommitSync();

/**
* Revoke internal states for some partitions.
*
* @param partitions which was revoked from consumer
*/
void revokePartitions(Collection<TopicPartition> partitions);
Set<TopicPartition> partialCommitSync(ProcessRecordsProgress progress);
}
Loading