Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ public class UnknownTopologyException extends StreamsException {
private static final long serialVersionUID = 1L;

public UnknownTopologyException(final String message, final String namedTopology) {
super(message + "due to being unable to locate a Topology named " + namedTopology);
super(message + " due to being unable to locate a Topology named " + namedTopology);
}

public UnknownTopologyException(final String message, final Throwable throwable, final String namedTopology) {
super(message + "due to being unable to locate a Topology named " + namedTopology, throwable);
super(message + " due to being unable to locate a Topology named " + namedTopology, throwable);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,8 @@ private long pollPhase() {
.ifPresent(t -> taskManager.updateTaskEndMetadata(topicPartition, t.offset()));
}

log.debug("Main Consumer poll completed in {} ms and fetched {} records", pollLatency, numRecords);
log.debug("Main Consumer poll completed in {} ms and fetched {} records from partitions {}",
pollLatency, numRecords, records.partitions());

pollSensor.record(pollLatency, now);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;

import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;

/**
* Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is
* shared between all StreamThreads.
*/
public class TaskExecutionMetadata {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether it is or will be cleaner in the long run to have this separate class that now has to keep up with topology additions/removals vs just doing all this bookkeeping inside the TopologyMetadata/InternalTopologyBuilder classes -- but until we can carve out time for a real tech debt cleanup of those classes which are already pretty out of control, I felt it best to pull everything out even if it meant duplicated un/registration of topologies

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing this up, I think we can come back and clean this up after we've gained confidence and is ready to extend beyond named topology later.

private final boolean hasNamedTopologies;
// map of topologies experiencing errors/currently under backoff
private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>();

public TaskExecutionMetadata(final Set<String> allTopologyNames) {
this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
}

public boolean canProcessTask(final Task task, final long now) {
final String topologyName = task.id().topologyName();
if (!hasNamedTopologies) {
// TODO implement error handling/backoff for non-named topologies (needs KIP)
return true;
} else {
final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
}
}

public void registerTaskError(final Task task, final Throwable t, final long now) {
if (hasNamedTopologies) {
final String topologyName = task.id().topologyName();
topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new NamedTopologyMetadata(topologyName))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it so that we only track topologies in the map here if they have an active backoff/task in error, rather than registering and unregistering named topologies and trying to keep this in sync between the TopologyMetadata and the individual StreamThreads' view (which was starting to look pretty ugly)

Instead we just pop the topology's metadata into the map when one of its tasks hits a new error, and clear it if/when all tasks are healthy again

.registerTaskError(task, t, now);
}
}

class NamedTopologyMetadata {
private final Logger log;
private final Map<TaskId, Long> tasksToErrorTime = new ConcurrentHashMap<>();

public NamedTopologyMetadata(final String topologyName) {
final LogContext logContext = new LogContext(String.format("topology-name [%s] ", topologyName));
this.log = logContext.logger(NamedTopologyMetadata.class);
}

public boolean canProcess() {
// TODO: during long task backoffs, pause the full topology to avoid it getting out of sync
return true;
}

public boolean canProcessTask(final Task task, final long now) {
// TODO: implement exponential backoff, for now we just wait 15s
final Long errorTime = tasksToErrorTime.get(task.id());
if (errorTime == null) {
return true;
} else if (now - errorTime > 15000L) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why the magic number of 15s?

Copy link
Member Author

@ableegoldman ableegoldman Feb 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it was actually taking the thread 10s to come back up (in the integration test where we overrode session.timeout to 10s) before we had #11801

Now with that fix it takes .5 - 4s for the thread to be replaced, so there's no particular reason to have it be 15s. I think it makes sense to lower it to maybe 5s for now, and then when we have the true exponential backoff obviously it can start lower and grow from there.

log.info("End backoff for task {} at t={}", task.id(), now);
tasksToErrorTime.remove(task.id());
if (tasksToErrorTime.isEmpty()) {
topologyNameToErrorMetadata.remove(task.id().topologyName());
}
return true;
} else {
log.debug("Skipping processing for unhealthy task {} at t={}", task.id(), now);
return false;
}
}

public synchronized void registerTaskError(final Task task, final Throwable t, final long now) {
log.info("Begin backoff for unhealthy task {} at t={}", task.id(), now);
tasksToErrorTime.put(task.id(), now);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ public class TaskExecutor {
private final boolean hasNamedTopologies;
private final ProcessingMode processingMode;
private final Tasks tasks;
private final TaskExecutionMetadata taskExecutionMetadata;

public TaskExecutor(final Tasks tasks,
final TaskExecutionMetadata taskExecutionMetadata,
final ProcessingMode processingMode,
final boolean hasNamedTopologies,
final LogContext logContext) {
this.tasks = tasks;
this.taskExecutionMetadata = taskExecutionMetadata;
this.processingMode = processingMode;
this.hasNamedTopologies = hasNamedTopologies;
this.log = logContext.logger(getClass());
Expand All @@ -69,23 +72,28 @@ public TaskExecutor(final Tasks tasks,
int process(final int maxNumRecords, final Time time) {
int totalProcessed = 0;
Task lastProcessed = null;
try {
for (final Task task : tasks.activeTasks()) {
lastProcessed = task;
totalProcessed += processTask(task, maxNumRecords, time);

for (final Task task : tasks.activeTasks()) {
final long now = time.milliseconds();
try {
if (taskExecutionMetadata.canProcessTask(task, now)) {
lastProcessed = task;
totalProcessed += processTask(task, maxNumRecords, now, time);
}
} catch (final Throwable t) {
taskExecutionMetadata.registerTaskError(task, t, now);
tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed);
commitSuccessfullyProcessedTasks();
throw t;
}
} catch (final Exception e) {
tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed);
commitSuccessfullyProcessedTasks();
throw e;
}

return totalProcessed;
}

private long processTask(final Task task, final int maxNumRecords, final Time time) {
private long processTask(final Task task, final int maxNumRecords, final long begin, final Time time) {
int processed = 0;
long now = time.milliseconds();
long now = begin;

final long then = now;
try {
Expand All @@ -94,12 +102,14 @@ private long processTask(final Task task, final int maxNumRecords, final Time ti
processed++;
}
// TODO: enable regardless of whether using named topologies
if (hasNamedTopologies && processingMode != EXACTLY_ONCE_V2) {
if (processed > 0 && hasNamedTopologies && processingMode != EXACTLY_ONCE_V2) {
log.trace("Successfully processed task {}", task.id());
tasks.addToSuccessfullyProcessed(task);
}
} catch (final TimeoutException timeoutException) {
// TODO consolidate TimeoutException retries with general error handling
task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
log.debug(
log.error(
String.format(
"Could not complete processing records for %s due to the following exception; will move to next task and retry later",
task.id()),
Expand All @@ -110,11 +120,11 @@ private long processTask(final Task task, final int maxNumRecords, final Time ti
"Will trigger a new rebalance and close all tasks as zombies together.", task.id());
throw e;
} catch (final StreamsException e) {
log.error("Failed to process stream task {} due to the following error:", task.id(), e);
log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to see log4j still have not figured out the way for both string param and exception in presentation..

e.setTaskId(task.id());
throw e;
} catch (final RuntimeException e) {
log.error("Failed to process stream task {} due to the following error:", task.id(), e);
log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), e);
throw new StreamsException(e, task.id());
} finally {
now = time.milliseconds();
Expand All @@ -132,7 +142,7 @@ private long processTask(final Task task, final int maxNumRecords, final Time ti
* @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit
*/
int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCommit,
final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {
final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {
int committed = 0;
for (final Task task : tasksToCommit) {
// we need to call commitNeeded first since we need to update committable offsets
Expand Down Expand Up @@ -253,7 +263,7 @@ private void commitSuccessfullyProcessedTasks() {
if (!tasks.successfullyProcessed().isEmpty()) {
log.info("Streams encountered an error when processing tasks." +
" Will commit all previously successfully processed tasks {}",
tasks.successfullyProcessed().toString());
tasks.successfullyProcessed().stream().map(Task::id));
commitTasksAndMaybeUpdateCommittableOffsets(tasks.successfullyProcessed(), new HashMap<>());
}
tasks.clearSuccessfullyProcessed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,13 @@ public class TaskManager {
this.log = logContext.logger(getClass());

this.tasks = new Tasks(logContext, topologyMetadata, streamsMetrics, activeTaskCreator, standbyTaskCreator);
this.taskExecutor = new TaskExecutor(tasks, processingMode, topologyMetadata.hasNamedTopologies(), logContext);
this.taskExecutor = new TaskExecutor(
tasks,
topologyMetadata.taskExecutionMetadata(),
processingMode,
topologyMetadata.hasNamedTopologies(),
logContext
);
}

void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class TopologyMetadata {
private final StreamsConfig config;
private final ProcessingMode processingMode;
private final TopologyVersion version;
private final TaskExecutionMetadata taskExecutionMetadata;

private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability

Expand Down Expand Up @@ -96,8 +97,8 @@ public TopologyVersionWaiters(final long topologyVersion, final KafkaFutureImpl<

public TopologyMetadata(final InternalTopologyBuilder builder,
final StreamsConfig config) {
version = new TopologyVersion();
processingMode = StreamsConfigUtils.processingMode(config);
this.version = new TopologyVersion();
this.processingMode = StreamsConfigUtils.processingMode(config);
this.config = config;

builders = new ConcurrentSkipListMap<>();
Expand All @@ -106,6 +107,7 @@ public TopologyMetadata(final InternalTopologyBuilder builder,
} else {
builders.put(UNNAMED_TOPOLOGY, builder);
}
this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet());
}

public TopologyMetadata(final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders,
Expand All @@ -115,11 +117,11 @@ public TopologyMetadata(final ConcurrentNavigableMap<String, InternalTopologyBui
this.config = config;
this.log = LoggerFactory.getLogger(getClass());


this.builders = builders;
if (builders.isEmpty()) {
log.info("Created an empty KafkaStreams app with no topology");
}
this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet());
}

// Need to (re)set the log here to pick up the `processId` part of the clientId in the prefix
Expand Down Expand Up @@ -160,6 +162,10 @@ public void unregisterThread(final String threadName) {
maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName);
}

public TaskExecutionMetadata taskExecutionMetadata() {
return taskExecutionMetadata;
}

public void maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(final String threadName) {
try {
lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
Expand All @@ -47,14 +45,11 @@
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

@Category(IntegrationTest.class)
public class EmitOnChangeIntegrationTest {
Expand Down Expand Up @@ -177,77 +172,4 @@ public void shouldEmitSameRecordAfterFailover() throws Exception {
);
}
}

@Test
public void shouldEmitRecordsAfterFailures() throws Exception {
final Properties properties = mkObjectProperties(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
)
);

try (final KafkaStreamsNamedTopologyWrapper kafkaStreams = new KafkaStreamsNamedTopologyWrapper(properties)) {
kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);

final NamedTopologyBuilder builder = kafkaStreams.newNamedTopologyBuilder("topology_A");
final AtomicInteger noOutputExpected = new AtomicInteger(0);
final AtomicInteger twoOutputExpected = new AtomicInteger(0);
builder.stream(inputTopic2).peek((k, v) -> twoOutputExpected.incrementAndGet()).to(outputTopic2);
builder.stream(inputTopic)
.peek((k, v) -> {
throw new RuntimeException("Kaboom");
})
.peek((k, v) -> noOutputExpected.incrementAndGet())
.to(outputTopic);

kafkaStreams.addNamedTopology(builder.build());

StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
inputTopic,
Arrays.asList(
new KeyValue<>(1, "A")
),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
0L);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
inputTopic2,
Arrays.asList(
new KeyValue<>(1, "A"),
new KeyValue<>(1, "B")
),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
0L);
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
IntegerDeserializer.class,
StringDeserializer.class
),
outputTopic2,
Arrays.asList(
new KeyValue<>(1, "A"),
new KeyValue<>(1, "B")
)
);
assertThat(noOutputExpected.get(), equalTo(0));
assertThat(twoOutputExpected.get(), equalTo(2));
}
}
}
Loading