Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public CommitRequestManager(
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
maybeAutoCommit();
maybeAutoCommit(this.subscriptionState.allConsumed());
if (!pendingRequests.hasUnsentRequests()) {
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, Collections.emptyList());
}
Expand All @@ -101,7 +101,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
Collections.unmodifiableList(pendingRequests.drain(currentTimeMs)));
}

private void maybeAutoCommit() {
public void maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (!autoCommitState.isPresent()) {
return;
}
Expand All @@ -111,8 +111,7 @@ private void maybeAutoCommit() {
return;
}

Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptionState.allConsumed();
sendAutoCommit(allConsumedOffsets);
sendAutoCommit(offsets);
autocommit.resetTimer();
autocommit.setInflightCommitStatus(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;

import static java.util.Objects.requireNonNull;

/**
* Background thread runnable that consumes {@code ApplicationEvent} and
* produces {@code BackgroundEvent}. It uses an event loop to consume and
Expand All @@ -61,6 +62,7 @@ public class DefaultBackgroundThread extends KafkaThread {
private final NetworkClientDelegate networkClientDelegate;
private final ErrorEventHandler errorEventHandler;
private final GroupState groupState;
private final SubscriptionState subscriptionState;
private boolean running;

private final Map<RequestManager.Type, Optional<RequestManager>> requestManagerRegistry;
Expand All @@ -71,6 +73,7 @@ public class DefaultBackgroundThread extends KafkaThread {
final LogContext logContext,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final SubscriptionState subscriptionState,
final ErrorEventHandler errorEventHandler,
final ApplicationEventProcessor processor,
final ConsumerMetadata metadata,
Expand All @@ -90,6 +93,7 @@ public class DefaultBackgroundThread extends KafkaThread {
this.networkClientDelegate = networkClient;
this.errorEventHandler = errorEventHandler;
this.groupState = groupState;
this.subscriptionState = subscriptionState;

this.requestManagerRegistry = new HashMap<>();
this.requestManagerRegistry.put(RequestManager.Type.COORDINATOR, Optional.ofNullable(coordinatorManager));
Expand All @@ -102,15 +106,24 @@ public DefaultBackgroundThread(final Time time,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final ConsumerMetadata metadata,
final SubscriptionState subscriptionState,
final KafkaClient networkClient) {
super(BACKGROUND_THREAD_NAME, true);
requireNonNull(config);
requireNonNull(rebalanceConfig);
requireNonNull(logContext);
requireNonNull(applicationEventQueue);
requireNonNull(backgroundEventQueue);
requireNonNull(metadata);
requireNonNull(subscriptionState);
requireNonNull(networkClient);
try {
this.time = time;
this.log = logContext.logger(getClass());
this.applicationEventQueue = applicationEventQueue;
this.backgroundEventQueue = backgroundEventQueue;
this.subscriptionState = subscriptionState;
this.config = config;
// subscriptionState is initialized by the polling thread
this.metadata = metadata;
this.networkClientDelegate = new NetworkClientDelegate(
this.time,
Expand All @@ -121,7 +134,7 @@ public DefaultBackgroundThread(final Time time,
this.errorEventHandler = new ErrorEventHandler(this.backgroundEventQueue);
this.groupState = new GroupState(rebalanceConfig);
this.requestManagerRegistry = Collections.unmodifiableMap(buildRequestManagerRegistry(logContext));
this.applicationEventProcessor = new ApplicationEventProcessor(backgroundEventQueue, requestManagerRegistry);
this.applicationEventProcessor = new ApplicationEventProcessor(backgroundEventQueue, requestManagerRegistry, metadata);
} catch (final Exception e) {
close();
throw new KafkaException("Failed to construct background processor", e.getCause());
Expand All @@ -138,11 +151,10 @@ private Map<RequestManager.Type, Optional<RequestManager>> buildRequestManagerRe
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
errorEventHandler,
groupState.groupId);
// Add subscriptionState
CommitRequestManager commitRequestManager = coordinatorManager == null ?
null :
new CommitRequestManager(time,
logContext, null, config,
logContext, this.subscriptionState, config,
coordinatorManager,
groupState);
registry.put(RequestManager.Type.COORDINATOR, Optional.ofNullable(coordinatorManager));
Expand Down Expand Up @@ -214,7 +226,7 @@ private Queue<ApplicationEvent> pollApplicationEvent() {
}

private void consumeApplicationEvent(final ApplicationEvent event) {
Objects.requireNonNull(event);
requireNonNull(event);
applicationEventProcessor.process(event);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public DefaultEventHandler(final Time time,
this.applicationEventQueue,
this.backgroundEventQueue,
metadata,
subscriptionState,
networkClient);
this.backgroundThread.start();
}
Expand All @@ -137,6 +138,7 @@ public DefaultEventHandler(final Time time,
final LogContext logContext,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final SubscriptionState subscriptionState,
final ConsumerMetadata metadata,
final KafkaClient networkClient) {
this.applicationEventQueue = applicationEventQueue;
Expand All @@ -149,6 +151,7 @@ public DefaultEventHandler(final Time time,
this.applicationEventQueue,
this.backgroundEventQueue,
metadata,
subscriptionState,
networkClient);
backgroundThread.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.EventHandler;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
Expand All @@ -58,6 +60,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -497,7 +500,7 @@ public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration

@Override
public Set<TopicPartition> assignment() {
throw new KafkaException("method not implemented");
return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
}

/**
Expand All @@ -522,7 +525,33 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb

@Override
public void assign(Collection<TopicPartition> partitions) {
throw new KafkaException("method not implemented");
if (partitions == null) {
throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null");
}

if (partitions.isEmpty()) {
// TODO: implementation of unsubscribe() will be included in forthcoming commits.
// this.unsubscribe();
return;
}

for (TopicPartition tp : partitions) {
String topic = (tp != null) ? tp.topic() : null;
if (Utils.isBlank(topic))
throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
}

// TODO: implementation of refactored Fetcher will be included in forthcoming commits.
// fetcher.clearBufferedDataForUnassignedPartitions(partitions);

// assignment change event will trigger autocommit if it is configured and the group id is specified. This is
// to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will
// be no following rebalance
eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds()));

log.info("Assigned to partition(s): {}", Utils.join(partitions, ", "));
if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
eventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ public String toString() {
return type + " ApplicationEvent";
}
public enum Type {
NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET,
NOOP, COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.NoopBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.common.KafkaException;
Expand All @@ -27,14 +28,17 @@
import java.util.concurrent.BlockingQueue;

public class ApplicationEventProcessor {

private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
private final Map<RequestManager.Type, Optional<RequestManager>> registry;
private final ConsumerMetadata metadata;

public ApplicationEventProcessor(
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final Map<RequestManager.Type, Optional<RequestManager>> requestManagerRegistry) {
public ApplicationEventProcessor(final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final Map<RequestManager.Type, Optional<RequestManager>> requestManagerRegistry,
final ConsumerMetadata metadata) {
this.backgroundEventQueue = backgroundEventQueue;
this.registry = requestManagerRegistry;
this.metadata = metadata;
}

public boolean process(final ApplicationEvent event) {
Expand All @@ -48,6 +52,10 @@ public boolean process(final ApplicationEvent event) {
return process((PollApplicationEvent) event);
case FETCH_COMMITTED_OFFSET:
return process((OffsetFetchApplicationEvent) event);
case METADATA_UPDATE:
return process((NewTopicsMetadataUpdateRequestEvent) event);
case ASSIGNMENT_CHANGE:
return process((AssignmentChangeApplicationEvent) event);
}
return false;
}
Expand Down Expand Up @@ -106,4 +114,20 @@ private boolean process(final OffsetFetchApplicationEvent event) {
manager.addOffsetFetchRequest(event.partitions);
return true;
}

private boolean process(final NewTopicsMetadataUpdateRequestEvent event) {
metadata.requestUpdateForNewTopics();
return true;
}

private boolean process(final AssignmentChangeApplicationEvent event) {
Optional<RequestManager> commitRequestManger = registry.get(RequestManager.Type.COMMIT);
if (!commitRequestManger.isPresent()) {
return false;
}
CommitRequestManager manager = (CommitRequestManager) commitRequestManger.get();
manager.updateAutoCommitTimer(event.currentTimeMs);
manager.maybeAutoCommit(event.offsets);
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;

public class AssignmentChangeApplicationEvent extends ApplicationEvent {
final Map<TopicPartition, OffsetAndMetadata> offsets;
final long currentTimeMs;

public AssignmentChangeApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, final long currentTimeMs) {
super(Type.ASSIGNMENT_CHANGE);
this.offsets = offsets;
this.currentTimeMs = currentTimeMs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;

public class NewTopicsMetadataUpdateRequestEvent extends ApplicationEvent {

public NewTopicsMetadataUpdateRequestEvent() {
super(Type.METADATA_UPDATE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ private List<CompletableFuture<ClientResponse>> assertPoll(
NetworkClientDelegate.PollResult res = manager.poll(time.milliseconds());
assertEquals(numRes, res.unsentRequests.size());

return res.unsentRequests.stream().map(r -> r.future()).collect(Collectors.toList());
return res.unsentRequests.stream().map(NetworkClientDelegate.UnsentRequest::future).collect(Collectors.toList());
}

private CommitRequestManager create(final boolean autoCommitEnabled, final long autoCommitInterval) {
Expand Down
Loading