Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -60,6 +61,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
private InternalProcessorContext processorContext;
private final int retries;
private final long retryBackoffMs;
private final Duration pollTime;

public GlobalStateManagerImpl(final LogContext logContext,
final ProcessorTopology topology,
Expand All @@ -76,6 +78,7 @@ public GlobalStateManagerImpl(final LogContext logContext,
this.stateRestoreListener = stateRestoreListener;
this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will return the value for "poll.ms" -- Should we check for "global.consumer.poll.ms" and user this value if present instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. poll.ms is not a ConsumerConfig, but a StreamsConfig, so users are not expected to prefix it with the consumer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. Still I am wondering if a single config for all consumers is smart. Also, the config is only used to pass into Consumer#poll() -- so even if it's not a config parameter, it's effectively still very similar to a "consumer config". I agree that it would required explicit documentation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is generally better to allow different values for this; on the other hand, I felt it is too much of a lower-level details for people to really figure it out right. For example, today global consumers have two callers of poll(), restore consumer has two as well, and normal consumer has one. We can argue further that all these five calls may prefer different values for the reason we discussed in this PR, like whether it is call in the main loop or not.

Besides, if want to allow prefix for this config we can rush it into 2.0, but it may be considered a public API change as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I just convinced myself that it makes sense what you say. As you pointed out, we call poll() on different consumers with different purpose:

normale processing:

  • main-consumer
  • global-consumer (regular g-table update)

updating standby tasks:

  • restore-consumer

restoring:

  • main-consumer (that is still doing regular work)
  • restore-consumer
  • global-consumer

Applying the same value poll.ms to main and global consumer in normal processing case makes sense to me. For updating standby-tasks, we hard-code "zero" already. Same for the main-consumer during restore phase, to speed up restoring. Both are reasonable choices. The last case, is the restore case for restore-consumer and global-consumer -- it might make sense to apply a different poll-time for thiscase, but I also think that using the same as for the normal processing case is fine.

Thus, long story short, iff we ever introduce a prefix for poll.ms parameters, it should be for "normal processing phase" and "restore phase" but should not be tied to the actual used consumer-instance. Thus, using the consumer prefix does not make sense.

}

@Override
Expand Down Expand Up @@ -262,7 +265,7 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,

while (offset < highWatermark) {
try {
final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(100);
final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime);
final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
for (ConsumerRecord<byte[], byte[]> record : records) {
if (record.key() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.Logger;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -200,7 +201,7 @@ static class StateConsumer {
private final Consumer<byte[], byte[]> globalConsumer;
private final GlobalStateMaintainer stateMaintainer;
private final Time time;
private final long pollMs;
private final Duration pollTime;
private final long flushInterval;
private final Logger log;

Expand All @@ -210,13 +211,13 @@ static class StateConsumer {
final Consumer<byte[], byte[]> globalConsumer,
final GlobalStateMaintainer stateMaintainer,
final Time time,
final long pollMs,
final Duration pollTime,
final long flushInterval) {
this.log = logContext.logger(getClass());
this.globalConsumer = globalConsumer;
this.stateMaintainer = stateMaintainer;
this.time = time;
this.pollMs = pollMs;
this.pollTime = pollTime;
this.flushInterval = flushInterval;
}

Expand All @@ -235,7 +236,7 @@ void initialize() {

void pollAndUpdate() {
try {
final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollMs);
final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollTime);
for (final ConsumerRecord<byte[], byte[]> record : received) {
stateMaintainer.update(record);
}
Expand Down Expand Up @@ -338,8 +339,9 @@ private StateConsumer initialize() {
logContext
),
time,
config.getLong(StreamsConfig.POLL_MS_CONFIG),
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)),
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)
);
stateConsumer.initialize();

return stateConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.slf4j.Logger;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -49,11 +50,14 @@ public class StoreChangelogReader implements ChangelogReader {
private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
private final Duration pollTime;

public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
final Duration pollTime,
final StateRestoreListener userStateRestoreListener,
final LogContext logContext) {
this.restoreConsumer = restoreConsumer;
this.pollTime = pollTime;
this.log = logContext.logger(getClass());
this.userStateRestoreListener = userStateRestoreListener;
}
Expand All @@ -76,7 +80,7 @@ public Collection<TopicPartition> restore(final RestoringTasks active) {
}

try {
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(10);
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(pollTime);
final Iterator<TopicPartition> iterator = needsRestoring.keySet().iterator();
while (iterator.hasNext()) {
final TopicPartition partition = iterator.next();
Expand Down Expand Up @@ -295,6 +299,7 @@ private boolean hasPartition(final TopicPartition topicPartition) {
return true;
}
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -212,7 +213,7 @@ State setState(final State newState) {
if (newState == State.RUNNING) {
updateThreadMetadata(taskManager.activeTasks(), taskManager.standbyTasks());
} else {
updateThreadMetadata(Collections.<TaskId, StreamTask>emptyMap(), Collections.<TaskId, StandbyTask>emptyMap());
updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
Copy link
Member

@mjsax mjsax Jun 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, Java8! :)

}
}

Expand Down Expand Up @@ -555,7 +556,7 @@ static class StreamsMetricsThreadImpl extends StreamsMetricsImpl {
}

private final Time time;
private final long pollTimeMs;
private final Duration pollTime;
private final long commitTimeMs;
private final Object stateLock;
private final Logger log;
Expand Down Expand Up @@ -602,7 +603,8 @@ public static StreamThread create(final InternalTopologyBuilder builder,
log.info("Creating restore consumer client");
final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(threadClientId);
final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext);
final Duration pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above: should we check for "restore.consumer.poll.ms" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess not, following Guozhang's comment.

final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, pollTime, userStateRestoreListener, logContext);

Producer<byte[], byte[]> threadProducer = null;
final boolean eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
Expand Down Expand Up @@ -710,10 +712,10 @@ public StreamThread(final Time time,
this.originalReset = originalReset;
this.versionProbingFlag = versionProbingFlag;

this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above: "main.consumer.poll.ms" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess not, following Guozhang's comment.

this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);

updateThreadMetadata(Collections.<TaskId, StreamTask>emptyMap(), Collections.<TaskId, StandbyTask>emptyMap());
updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
}

/**
Expand Down Expand Up @@ -801,14 +803,14 @@ long runOnce(final long recordsProcessedBeforeCommit) {
if (state == State.PARTITIONS_ASSIGNED) {
// try to fetch some records with zero poll millis
// to unblock the restoration as soon as possible
records = pollRequests(0L);
records = pollRequests(Duration.ZERO);

if (taskManager.updateNewAndRestoringTasks()) {
setState(State.RUNNING);
}
} else {
// try to fetch some records if necessary
records = pollRequests(pollTimeMs);
records = pollRequests(pollTime);

// if state changed after the poll call,
// try to initialize the assigned tasks again
Expand Down Expand Up @@ -843,15 +845,15 @@ long runOnce(final long recordsProcessedBeforeCommit) {
/**
* Get the next batch of records by polling.
*
* @param pollTimeMs poll time millis parameter for the consumer poll
* @param pollTime how long to block in Consumer#poll
* @return Next batch of records or null if no records available.
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
private ConsumerRecords<byte[], byte[]> pollRequests(final long pollTimeMs) {
private ConsumerRecords<byte[], byte[]> pollRequests(final Duration pollTime) {
ConsumerRecords<byte[], byte[]> records = null;

try {
records = consumer.poll(pollTimeMs);
records = consumer.poll(pollTime);
} catch (final InvalidOffsetException e) {
resetInvalidOffsets(e);
}
Expand Down Expand Up @@ -1078,7 +1080,11 @@ private void maybeUpdateStandbyTasks(final long now) {
}

try {
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
// poll(0): Since this is during the normal processing, not during restoration.
// We can afford to have slower restore (because we don't wait inside poll for results).
// Instead, we want to proceed to the next iteration to call the main consumer#poll()
// as soon as possible so as to not be kicked out of the group.
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(Duration.ZERO);

if (!records.isEmpty()) {
for (final TopicPartition partition : records.partitions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
Expand All @@ -25,6 +27,7 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
Expand All @@ -42,7 +45,6 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

Expand Down Expand Up @@ -234,26 +236,35 @@ public boolean conditionMet() {
assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
}

@Ignore // this test cannot pass as long as GST blocks KS.start()
@Test
public void testGlobalThreadCloseWithoutConnectingToBroker() {
public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1");
props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);

// We want to configure request.timeout.ms, but it must be larger than a
// few other configs.
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 200);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 201);
props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 202);

final StreamsBuilder builder = new StreamsBuilder();
// make sure we have the global state thread running too
builder.globalTable("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
streams.close();
try {
streams.start();
fail("expected start() to time out and throw an exception.");
} catch (final StreamsException expected) {
// This is a result of not being able to connect to the broker.
}
// There's nothing to assert... We're testing that this operation actually completes.
}

@Ignore // this test cannot pass until we implement KIP-266
@Test
public void testLocalThreadCloseWithoutConnectingToBroker() {
final Properties props = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -464,7 +465,7 @@ private static <K, V> List<ConsumerRecord<K, V>> readRecords(final String topic,
while (totalPollTimeMs < waitTime &&
continueConsuming(consumerRecords.size(), maxMessages)) {
totalPollTimeMs += pollIntervalMs;
final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
final ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(pollIntervalMs));

for (final ConsumerRecord<K, V> record : records) {
consumerRecords.add(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.kafka.streams.state.WindowStore;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -334,7 +335,7 @@ private void consumeAndProduce(final String topic) {
consumer.seekToBeginning(partitions);

while (true) {
final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
final ConsumerRecords<Integer, byte[]> records = consumer.poll(Duration.ofMillis(POLL_MS));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for sanity checking, maybe we should run SimpleBenchmark system tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I definitely think we should run all the tests we can before the 2.0 release.

if (records.isEmpty()) {
if (processedRecords == numRecords) {
break;
Expand Down Expand Up @@ -372,7 +373,7 @@ private void consume(final String topic) {
consumer.seekToBeginning(partitions);

while (true) {
final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
final ConsumerRecords<Integer, byte[]> records = consumer.poll(Duration.ofMillis(POLL_MS));
if (records.isEmpty()) {
if (processedRecords == numRecords) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -233,7 +234,7 @@ private AbstractTask createTask(final Consumer consumer,
storeTopicPartitions,
ProcessorTopology.withLocalStores(new ArrayList<>(stateStoresToChangelogTopics.keySet()), storeNamesToChangelogTopics),
consumer,
new StoreChangelogReader(consumer, new MockStateRestoreListener(), new LogContext("stream-task-test ")),
new StoreChangelogReader(consumer, Duration.ZERO, new MockStateRestoreListener(), new LogContext("stream-task-test ")),
false,
stateDirectory,
config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -122,7 +123,12 @@ private StreamsConfig createConfig(final File baseDir) throws IOException {

private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer();
private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new LogContext("standby-task-test "));
private final StoreChangelogReader changelogReader = new StoreChangelogReader(
restoreStateConsumer,
Duration.ZERO,
stateRestoreListener,
new LogContext("standby-task-test ")
);

private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
Expand Down Expand Up @@ -188,7 +194,7 @@ public void testUpdate() throws IOException {
}

restoreStateConsumer.seekToBeginning(partition);
task.update(partition2, restoreStateConsumer.poll(100).records(partition2));
task.update(partition2, restoreStateConsumer.poll(Duration.ofMillis(100)).records(partition2));

StandbyContextImpl context = (StandbyContextImpl) task.context();
MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1);
Expand Down Expand Up @@ -245,7 +251,7 @@ public void testUpdateKTable() throws IOException {
}

// The commit offset is at 0L. Records should not be processed
List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(100).records(globalTopicPartition));
List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(Duration.ofMillis(100)).records(globalTopicPartition));
assertEquals(5, remaining.size());

committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(10L));
Expand Down
Loading